This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f5faae0 [SAMZA-2557] Adding support for nested rows access via dot
path. (#1386)
f5faae0 is described below
commit f5faae09fa45844ba55326a7f10a475d9e1ee8e3
Author: Slim Bouguerra <[email protected]>
AuthorDate: Wed Jul 22 08:52:05 2020 -0700
[SAMZA-2557] Adding support for nested rows access via dot path. (#1386)
* Working version still need to work on extracting nested fields udf
* working version end to end with Filter optimization
* left outer join test with filters
* adding more comments
* fix the type converter used by udfs
* refix the test
* Added GetNestedField built in operator to allow support backward
comaptiblity
* fix java doc and minor change on the type cast
Not sure what this test is testing for it is a regular join between Stream
and local table
* Adding more tests and some logging to help read the compiled code
* Adding some Type sanity to the Join functions
* fix minor WAR
* revert unwanted changes
* fix the tests
* Adding more tests for map type and fix the Avro conversion for type with
one type only
* fix the style checks
---
.../apache/samza/sql/avro/AvroRelConverter.java | 10 +-
.../apache/samza/sql/avro/AvroTypeFactoryImpl.java | 6 +-
.../org/apache/samza/sql/fn/GetNestedFieldUdf.java | 42 ----
.../java/org/apache/samza/sql/planner/Checker.java | 11 +-
.../org/apache/samza/sql/planner/QueryPlanner.java | 92 ++++-----
.../samza/sql/planner/RelSchemaConverter.java | 7 +
.../samza/sql/planner/SamzaSqlOperatorTable.java | 2 +
.../sql/runner/SamzaSqlApplicationRunner.java | 1 +
.../samza/sql/translator/FilterTranslator.java | 4 +-
.../apache/samza/sql/translator/JoinInputNode.java | 10 +-
.../samza/sql/translator/JoinTranslator.java | 30 ++-
.../sql/translator/MessageStreamCollector.java | 214 +++++++++++++++++++++
.../samza/sql/translator/ProjectTranslator.java | 139 ++++++++++++-
.../SamzaSqlRemoteTableJoinFunction.java | 49 ++++-
.../sql/translator/SamzaSqlTableJoinFunction.java | 8 +-
.../samza/sql/translator/ScanTranslator.java | 3 +
.../samza/sql/translator/TranslatorContext.java | 5 +-
.../org/apache/samza/sql/udf/GetNestedField.java | 171 ++++++++++++++++
.../apache/samza/sql/planner/TestQueryPlanner.java | 12 +-
.../samza/sql/system/TestAvroSystemFactory.java | 3 +
.../sql/translator/TestProjectTranslator.java | 20 +-
.../samza/sql/translator/TestQueryTranslator.java | 48 ++---
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 82 +++++++-
.../test/samzasql/TestSamzaSqlRemoteTable.java | 80 +++++++-
24 files changed, 864 insertions(+), 185 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index b7cee00..c4138ea 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -201,10 +201,12 @@ public class AvroRelConverter implements
SamzaRelConverter {
.collect(Collectors.toList());
return avroList;
case MAP:
- return ((Map<String, ?>) relObj).entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> convertToAvroObject(e.getValue(),
getNonNullUnionSchema(schema).getValueType())));
+ // If you ask why not using String and that is because some strings
are Wrapped into org.apache.avro.util.Utf8
+ // TODO looking at the Utf8 code base it is not immutable, having it
as a key is calling for trouble!
+ final Map<Object, Object> outputMap = new HashMap<>();
+ ((Map<Object, Object>) relObj).forEach((key, aValue) ->
outputMap.put(key,
+ convertToAvroObject(aValue,
getNonNullUnionSchema(schema).getValueType())));
+ return outputMap;
case UNION:
for (Schema unionSchema : schema.getTypes()) {
if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 5dddf15..d837e03 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -148,8 +148,10 @@ public class AvroTypeFactoryImpl extends
SqlTypeFactoryImpl {
}
private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean
isNullable, boolean isOptional) {
- // Typically a nullable field's schema is configured as an union of Null
and a Type.
- if (types.size() == 2) {
+ if (types.size() == 1) {
+ return convertField(types.get(0), true, true);
+ } else if (types.size() == 2) {
+ // Typically a nullable field's schema is configured as an union of Null
and a Type.
if (types.get(0).getType() == Schema.Type.NULL) {
return convertField(types.get(1), true, true);
} else if (types.get(1).getType() == Schema.Type.NULL) {
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
deleted file mode 100644
index 4ef2a11..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.fn;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-import org.apache.samza.sql.schema.SamzaSqlFieldType;
-import org.apache.samza.sql.udfs.SamzaSqlUdf;
-import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
-import org.apache.samza.sql.udfs.ScalarUdf;
-
-
-@SamzaSqlUdf(name = "GetNestedField", description = "UDF that extracts a field
value from a nested SamzaSqlRelRecord")
-public class GetNestedFieldUdf implements ScalarUdf {
- @Override
- public void init(Config udfConfig, Context context) {
- }
-
- @SamzaSqlUdfMethod(params = {SamzaSqlFieldType.ANY,
SamzaSqlFieldType.STRING},
- returns = SamzaSqlFieldType.ANY)
- public Object execute(Object currentFieldOrValue, String fieldName) {
- GetSqlFieldUdf udf = new GetSqlFieldUdf();
- return udf.getSqlField(currentFieldOrValue, fieldName);
- }
-}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
index ccbee6a..60794ef 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -98,11 +98,11 @@ class Checker implements SqlOperandTypeChecker {
if (parsedSqlArgType.getSqlTypeName() == SqlTypeName.CHAR &&
udfArgumentAsSqlType == SqlTypeName.VARCHAR) {
return true;
} else if (!Objects.equals(parsedSqlArgType.getSqlTypeName(),
udfArgumentAsSqlType)
- &&
!ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) &&
hasOneUdfMethod(udfMetadata)) {
+ && !ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName())
&& hasOneUdfMethod(udfMetadata)) {
// 3(b). Throw up and fail on mismatch between the SamzaSqlType and
CalciteType for any argument.
- String msg = String.format("Type mismatch in udf class: %s at
argument index: %d." +
- "Expected type: %s, actual type: %s.",
udfMetadata.getName(),
- udfArgumentIndex, parsedSqlArgType.getSqlTypeName(),
udfArgumentAsSqlType);
+ String msg = String.format(
+ "Type mismatch in udf class: %s at argument index: %d." +
"Expected type: %s, actual type: %s.",
+ udfMetadata.getName(), udfArgumentIndex, udfArgumentAsSqlType,
parsedSqlArgType.getSqlTypeName());
LOG.error(msg);
throw new SamzaSqlValidatorException(msg);
}
@@ -159,8 +159,9 @@ class Checker implements SqlOperandTypeChecker {
static SqlTypeName toCalciteSqlType(SamzaSqlFieldType samzaSqlFieldType) {
switch (samzaSqlFieldType) {
case ANY:
- case ROW:
return SqlTypeName.ANY;
+ case ROW:
+ return SqlTypeName.ROW;
case MAP:
return SqlTypeName.MAP;
case ARRAY:
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 767df43..dc37753 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -20,15 +20,13 @@
package org.apache.samza.sql.planner;
import com.google.common.collect.ImmutableList;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
@@ -62,10 +60,10 @@ import org.apache.samza.SamzaException;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.UdfMetadata;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
-import org.apache.samza.sql.interfaces.UdfMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,56 +120,42 @@ public class QueryPlanner {
}
private Planner getPlanner() {
- Planner planner = null;
- try {
- Connection connection = DriverManager.getConnection("jdbc:calcite:");
- CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
- SchemaPlus rootSchema = calciteConnection.getRootSchema();
- registerSourceSchemas(rootSchema);
-
- List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
- .map(SamzaSqlScalarFunctionImpl::new)
- .collect(Collectors.toList());
-
- final List<RelTraitDef> traitDefs = new ArrayList<>();
-
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(new SamzaSqlOperatorTable());
- sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
-
- // TODO: Introduce a pluggable rule factory.
- List<RelOptRule> rules = ImmutableList.of(
- FilterProjectTransposeRule.INSTANCE,
- ProjectMergeRule.INSTANCE,
- new
SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true,
RelFactories.LOGICAL_BUILDER,
- systemStreamConfigBySource));
-
- // Using lenient so that !=,%,- are allowed.
- FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder()
- .setLex(Lex.JAVA)
- .setConformance(SqlConformanceEnum.LENIENT)
- .setCaseSensitive(false) // Make Udfs case insensitive
- .build())
- .defaultSchema(rootSchema)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
- .traitDefs(traitDefs)
- .programs(Programs.hep(rules, true,
DefaultRelMetadataProvider.INSTANCE))
- .build();
- planner = Frameworks.getPlanner(frameworkConfig);
- return planner;
- } catch (Exception e) {
- String errorMsg = "Failed to create planner.";
- LOG.error(errorMsg, e);
- if (planner != null) {
- planner.close();
- }
- throw new SamzaException(errorMsg, e);
- }
+ Planner planner;
+ SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
+ registerSourceSchemas(rootSchema);
+
+ List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions =
+
udfMetadata.stream().map(SamzaSqlScalarFunctionImpl::new).collect(Collectors.toList());
+
+ final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(new SamzaSqlOperatorTable());
+ sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+
+ // TODO: Introduce a pluggable rule factory.
+ List<RelOptRule> rules =
ImmutableList.of(FilterProjectTransposeRule.INSTANCE, ProjectMergeRule.INSTANCE,
+ new
SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true,
RelFactories.LOGICAL_BUILDER,
+ systemStreamConfigBySource));
+
+ // Using lenient so that !=,%,- are allowed.
+ FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder()
+ .setLex(Lex.JAVA)
+ .setConformance(SqlConformanceEnum.LENIENT)
+ .setCaseSensitive(false) // Make Udfs case insensitive
+ .build())
+ .defaultSchema(rootSchema)
+ .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
+ .traitDefs(traitDefs)
+ .programs(Programs.hep(rules, true,
DefaultRelMetadataProvider.INSTANCE))
+ .build();
+ planner = Frameworks.getPlanner(frameworkConfig);
+ return planner;
}
private RelRoot optimize(Planner planner, RelRoot relRoot) {
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index c3735a4..fcc289c 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -21,11 +21,13 @@ package org.apache.samza.sql.planner;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
import org.apache.calcite.sql.type.ArraySqlType;
import org.apache.calcite.sql.type.MapSqlType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
@@ -94,6 +96,11 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
case INT64:
return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT),
true);
case ROW:
+ final RelDataType rowType =
convertToRelSchema(fieldSchema.getRowSchema());
+ /* Using Fully Qualified names to ensure that at the last project the
row is fully reconstructed */
+ return
createTypeWithNullability(createStructType(StructKind.FULLY_QUALIFIED,
+
rowType.getFieldList().stream().map(RelDataTypeField::getType).collect(Collectors.toList()),
+ rowType.getFieldNames()), true);
case ANY:
// TODO Calcite execution engine doesn't support record type yet.
return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index 3098af7..6b8e8ba 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.fun.SqlMultisetValueConstructor;
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.samza.sql.udf.GetNestedField;
/**
@@ -143,6 +144,7 @@ public class SamzaSqlOperatorTable extends
ReflectiveSqlOperatorTable {
public static final SqlFunction TUMBLE = SqlStdOperatorTable.TUMBLE;
public static final SqlFunction TUMBLE_END = SqlStdOperatorTable.TUMBLE_END;
public static final SqlFunction TUMBLE_START =
SqlStdOperatorTable.TUMBLE_START;
+ public static final SqlFunction GET_NESTED_FIELD_OP =
GetNestedField.INSTANCE;
public SamzaSqlOperatorTable() {
init();
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index ce42bee..23d7be5 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -61,6 +61,7 @@ public class SamzaSqlApplicationRunner implements
ApplicationRunner {
* NOTE: This constructor is called from {@link ApplicationRunners} through
reflection.
* Please refrain from updating the signature or removing this constructor
unless the caller has changed the interface.
*/
+ @SuppressWarnings("unused") /* used via reflection */
public SamzaSqlApplicationRunner(SamzaApplication app, Config config) {
this(app, false, config);
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 604f061..b42569c 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -80,6 +80,7 @@ class FilterTranslator {
this.context = context;
this.translatorContext = ((SamzaSqlApplicationContext)
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.filter = (LogicalFilter)
this.translatorContext.getRelNode(filterId);
+ LOG.info("Compiling Operator {}", filter.getDigest());
this.expr =
this.translatorContext.getExpressionCompiler().compile(filter.getInputs(),
Collections.singletonList(filter.getCondition()));
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -96,9 +97,10 @@ class FilterTranslator {
public boolean apply(SamzaSqlRelMessage message) {
long startProcessing = System.nanoTime();
Object[] result = new Object[1];
+ Object[] inputRow =
ProjectTranslator.convertToJavaRow(message.getSamzaSqlRelRecord());
try {
expr.execute(translatorContext.getExecutionContext(), context,
translatorContext.getDataContext(),
- message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
+ inputRow, result);
} catch (Exception e) {
String errMsg = String.format("Handling the following rel message ran
into an error. %s", message);
LOG.error(errMsg, e);
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
index 1a0a7ec..c51ff25 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -74,7 +74,7 @@ public class JoinInputNode {
}
String getSourceName() {
- return
SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
+ return
SqlIOConfig.getSourceFromSourceParts(getTableScan(relNode).getTable().getQualifiedName());
}
boolean isPosOnRight() {
@@ -109,4 +109,12 @@ public class JoinInputNode {
}
}
+ private static TableScan getTableScan(RelNode relNode) {
+ if (relNode instanceof TableScan) {
+ return (TableScan) relNode;
+ }
+ // we deal with Single inputs filter/project
+ assert relNode.getInputs().size() == 1;
+ return getTableScan(relNode.getInput(0));
+ }
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 02c434d..26d9fa0 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -40,6 +40,7 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSlot;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
@@ -79,7 +80,7 @@ import static
org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeK
class JoinTranslator {
private static final Logger log =
LoggerFactory.getLogger(JoinTranslator.class);
- private String logicalOpId;
+ private final String logicalOpId;
private final String intermediateStreamPrefix;
private final int queryId;
private final TranslatorInputMetricsMapFunction inputMetricsMF;
@@ -165,8 +166,16 @@ class JoinTranslator {
if (tableNode.isRemoteTable()) {
String remoteTableName = tableNode.getSourceName();
- StreamTableJoinFunction joinFn = new
SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
- context.getTableKeyConverter(remoteTableName), streamNode,
tableNode, join.getJoinType(), queryId);
+ MessageStream<SamzaSqlRelMessage> operatorStack =
context.getMessageStream(tableNode.getRelNode().getId());
+ final StreamTableJoinFunction<Object, SamzaSqlRelMessage, KV,
SamzaSqlRelMessage> joinFn;
+ if (operatorStack != null && operatorStack instanceof
MessageStreamCollector) {
+ joinFn = new
SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+ context.getTableKeyConverter(remoteTableName), streamNode,
tableNode, join.getJoinType(), queryId,
+ (MessageStreamCollector) operatorStack);
+ } else {
+ joinFn = new
SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+ context.getTableKeyConverter(remoteTableName), streamNode,
tableNode, join.getJoinType(), queryId);
+ }
return inputStream
.map(inputMetricsMF)
@@ -175,7 +184,11 @@ class JoinTranslator {
// Join with the local table
- StreamTableJoinFunction joinFn = new
SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
+ StreamTableJoinFunction<SamzaSqlRelRecord,
+ SamzaSqlRelMessage,
+ KV<SamzaSqlRelRecord, SamzaSqlRelMessage>,
+ SamzaSqlRelMessage>
+ joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode,
join.getJoinType());
SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
(SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
@@ -270,8 +283,8 @@ class JoinTranslator {
isTablePosOnRight ? join.getRowType().getFieldCount() :
join.getLeft().getRowType().getFieldCount();
List<Integer> tableRefsIdx = refCollector.stream()
- .map(x -> x.getIndex())
- .filter(x -> tableStartIndex <= x && x < tableEndIndex) // collect all
the refs form table side
+ .map(RexSlot::getIndex)
+ .filter(x -> (tableStartIndex <= x) && (x < tableEndIndex)) // collect
all the refs form table side
.map(x -> x - tableStartIndex) // re-adjust the offset
.sorted()
.collect(Collectors.toList()); // we have a list with all the input
from table side with 0 based index.
@@ -344,11 +357,12 @@ class JoinTranslator {
private void validateJoinKeyType(RexInputRef ref) {
SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
- // Primitive types and ANY (for the record key) are supported in the key
+ // Primitive types and Row (for the record key) and ANY for other fields
like __key__
+ // TODO this need to be pulled to a common class/place that have all the
supported types
if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName !=
SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
&& sqlTypeName != SqlTypeName.INTEGER && sqlTypeName !=
SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
&& sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName !=
SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
- && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER)
{
+ && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER
&& sqlTypeName != SqlTypeName.ROW) {
log.error("Unsupported key type " + sqlTypeName + " used in join
condition.");
throw new SamzaException("Unsupported key type used in join condition.");
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
new file mode 100644
index 0000000..0ddc136
--- /dev/null
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Functions to collect call stack on the
top of Remote table.
+ * This Collector will be used by Join operator and trigger it when applying
the join function post lookup.
+ *
+ * Note that this is needed because the Remote Table can not expose a proper
{@code MessageStream}.
+ * It is a work around to minimize the amount of code changes of the current
Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
+ * But in an ideal world, we should use Calcite planner in conventional way we
can combine function when via translation of RelNodes.
+ */
+class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>,
Serializable, Closeable {
+
+ /**
+ * Queue First in First to be Fired order of the operators on the top of
Remote Table Scan.
+ */
+ private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends
SamzaSqlRelMessage>> mapFnCallQueue =
+ new ArrayDeque<>();
+
+ /**
+ * Function to chain the call to close from each operator.
+ */
+ private transient Function<Void, Void> closeFn = aVoid -> null;
+
+ @Override
+ public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ?
extends OM> mapFn) {
+ mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends
SamzaSqlRelMessage>) mapFn);
+ return (MessageStream<OM>) this;
+ }
+
+ @Override
+ public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super
SamzaSqlRelMessage> filterFn) {
+ mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
+ return this;
+ }
+
+ /**
+ * This function is called by the join operator on run time to apply filter
and projects post join lookup.
+ *
+ * @param context Samza Execution Context
+ * @return {code null} case filter reject the row, Samza Relational Record
as it goes via Projects.
+ */
+ Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context
context) {
+ Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
+ Function<Void, Void> intFn = aVoid -> null; // Projects and Filters both
need to be initialized.
+ closeFn = aVoid -> null;
+ // At this point we have a the queue of operator, where first in is the
first operator on top of TableScan.
+ while (!mapFnCallQueue.isEmpty()) {
+ MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage> f
= mapFnCallQueue.poll();
+ intFn = intFn.andThen((aVoid) -> {
+ f.init(context);
+ return null;
+ });
+ closeFn.andThen((aVoid) -> {
+ f.close();
+ return null;
+ });
+
+ Function<SamzaSqlRelMessage, SamzaSqlRelMessage> current = x -> x ==
null ? null : f.apply(x);
+ if (tailFn == null) {
+ tailFn = current;
+ } else {
+ tailFn = current.compose(tailFn);
+ }
+ }
+ // TODO TBH not sure about this need to check if Samza Framework will be
okay with late init call.
+ intFn.apply(null); // Init call has to happen here.
+ return tailFn == null ? Function.identity() : tailFn;
+ }
+
+ /**
+ * Filter adapter is used to compose filters with {@code
MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage>}
+ * Filter function will return {@code null} when input is {@code null} or
filter condition reject current row.
+ */
+ private static class FilterMapAdapter implements
MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+ private final FilterFunction<? super SamzaSqlRelMessage> filterFn;
+
+ private FilterMapAdapter(FilterFunction<? super SamzaSqlRelMessage>
filterFn) {
+ this.filterFn = filterFn;
+ }
+
+ @Override
+ public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+ if (message != null && filterFn.apply(message)) {
+ return message;
+ }
+ // null on case no match
+ return null;
+ }
+
+ @Override
+ public void close() {
+ filterFn.close();
+ }
+
+ @Override
+ public void init(Context context) {
+ filterFn.init(context);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closeFn != null) {
+ closeFn.apply(null);
+ }
+ }
+
+ @Override
+ public <OM> MessageStream<OM> flatMap(FlatMapFunction<? super
SamzaSqlRelMessage, ? extends OM> flatMapFn) {
+ return null;
+ }
+
+ @Override
+ public <OM> MessageStream<OM> flatMapAsync(
+ AsyncFlatMapFunction<? super SamzaSqlRelMessage, ? extends OM>
asyncFlatMapFn) {
+ return null;
+ }
+
+ @Override
+ public void sink(SinkFunction<? super SamzaSqlRelMessage> sinkFn) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public MessageStream<SamzaSqlRelMessage>
sendTo(OutputStream<SamzaSqlRelMessage> outputStream) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public <K, WV> MessageStream<WindowPane<K, WV>>
window(Window<SamzaSqlRelMessage, K, WV> window, String id) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+ JoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super OM, ?
extends JM> joinFn, Serde<K> keySerde,
+ Serde<SamzaSqlRelMessage> messageSerde, Serde<OM> otherMessageSerde,
Duration ttl, String id) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
+ StreamTableJoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super
R, ? extends JM> joinFn,
+ Object... args) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public MessageStream<SamzaSqlRelMessage> merge(
+ Collection<? extends MessageStream<? extends SamzaSqlRelMessage>>
otherStreams) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super
SamzaSqlRelMessage, ? extends K> keyExtractor,
+ MapFunction<? super SamzaSqlRelMessage, ? extends V> valueExtractor,
KVSerde<K, V> serde, String id) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table,
Object... args) {
+ throw new IllegalStateException("Not valid state");
+ }
+
+ @Override
+ public MessageStream<SamzaSqlRelMessage> broadcast(Serde<SamzaSqlRelMessage>
serde, String id) {
+ throw new IllegalStateException("Not valid state");
+ }
+}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index bf44815..16a320e 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,12 +19,15 @@
package org.apache.samza.sql.translator;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.calcite.DataContext;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
@@ -38,7 +41,9 @@ import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
@@ -50,7 +55,7 @@ import org.slf4j.LoggerFactory;
* Translator to translate the Project node in the relational graph to the
corresponding StreamGraph
* implementation.
*/
-class ProjectTranslator {
+public class ProjectTranslator {
private static final Logger LOG =
LoggerFactory.getLogger(ProjectTranslator.class);
//private transient int messageIndex = 0;
@@ -61,6 +66,122 @@ class ProjectTranslator {
}
/**
+ * Converts the resulting row from Calcite Expression Evaluator to
SamzaRelRecord to be sent downstream.
+ *
+ * @param objects input objects to be converted
+ * @param rowType Calcite row type of the resulting row
+ * @return return a valid message Stream of type SamzaSqlRelRecord
+ */
+ public static SamzaSqlRelRecord buildSamzaRelRecord(Object[] objects,
RelDataType rowType) {
+ Preconditions.checkNotNull(objects, "Input objects can not be null");
+ Preconditions.checkState(rowType.isStruct(), "Row Type has to be a Struct
and got " + rowType.getSqlTypeName());
+ Preconditions.checkState(objects.length == rowType.getFieldCount(),
+ "Objects counts and type counts must match " + objects.length + " vs "
+ rowType.getFieldCount());
+ List<String> names = new ArrayList<>(rowType.getFieldNames());
+ List<Object> values = new ArrayList<>(rowType.getFieldCount());
+ for (int i = 0; i < objects.length; i++) {
+ Object val = objects[i];
+ if (val == null) {
+ values.add(null);
+ continue;
+ }
+ final RelDataType valueType = rowType.getFieldList().get(i).getType();
+ values.add(convertToSamzaSqlType(val, valueType));
+ }
+ return new SamzaSqlRelRecord(names, values);
+ }
+
+ /**
+ * Recursively converts a Primitive Java Object to valid Samza Rel Record
field type.
+ *
+ * @param value input value to be converted
+ * @param dataType value type as derived by Calcite
+ * @return SamzaRelRecord or primitive SamzaRelRecord field.
+ *
+ */
+ private static Object convertToSamzaSqlType(Object value, RelDataType
dataType) {
+ if (value == null) {
+ return null;
+ }
+ switch (dataType.getSqlTypeName()) {
+ case ROW:
+ List<String> names = new ArrayList<>(dataType.getFieldNames());
+ // Row Struct is represent as Object array in Calcite.
+ Object[] row = (Object[]) value;
+ List<Object> values = new ArrayList<>(row.length);
+ for (int i = 0; i < row.length; i++) {
+ values.add(convertToSamzaSqlType(row[i],
dataType.getFieldList().get(i).getType()));
+ }
+ return new SamzaSqlRelRecord(names, values);
+ case MAP:
+ Map<Object, Object> objectMap = (Map<Object, Object>) value;
+ Map<Object, Object> resultMap = new HashMap<>();
+ final RelDataType valuesType = dataType.getValueType();
+ objectMap.forEach((key, v) -> resultMap.put(key,
convertToSamzaSqlType(v, valuesType)));
+ return resultMap;
+ case ARRAY:
+ List<Object> objectList = (List<Object>) value;
+ final RelDataType elementsType = dataType.getComponentType();
+ return objectList.stream().map(e -> convertToSamzaSqlType(e,
elementsType)).collect(Collectors.toList());
+ case BOOLEAN:
+ case BIGINT:
+ case BINARY:
+ case INTEGER:
+ case TINYINT:
+ case DOUBLE:
+ case FLOAT:
+ case REAL:
+ case VARCHAR:
+ case CHAR:
+ case VARBINARY:
+ case ANY:
+ case OTHER:
+ // today we treat everything else as Type Any or Other, this is not
ideal.
+ // this will change when adding timestamps support or more complex non
java primitive types.
+ // TODO in a better world we need to add type factory that can do the
conversion between calcite and samza.
+ return value;
+ default:
+ // As of today we treat everything else as type ANY
+ throw new IllegalStateException("Unknown SQL type " +
dataType.getSqlTypeName());
+ }
+ }
+
+ /**
+ * Converts the Samza Record to a Java Primitive Row format that's in
convention with Calcite Enum operators.
+ *
+ * @param samzaSqlRelRecord input record.
+ * @return row of Java Primitive conform to
org.apache.calcite.adapter.enumerable.JavaRowFormat#ARRAY
+ */
+ public static Object[] convertToJavaRow(SamzaSqlRelRecord samzaSqlRelRecord)
{
+ if (samzaSqlRelRecord == null) {
+ return null;
+ }
+ Object[] inputRow = new Object[samzaSqlRelRecord.getFieldValues().size()];
+ for (int i = 0; i < inputRow.length; i++) {
+ inputRow[i] =
asPrimitiveJavaRow(samzaSqlRelRecord.getFieldValues().get(i));
+ }
+ return inputRow;
+ }
+
+ private static Object asPrimitiveJavaRow(Object inputObject) {
+ if (inputObject == null) {
+ return null;
+ }
+ if (inputObject instanceof SamzaSqlRelRecord) {
+ return convertToJavaRow((SamzaSqlRelRecord) inputObject);
+ }
+ if (inputObject instanceof List) {
+ return ((List) inputObject).stream().map(e ->
asPrimitiveJavaRow(e)).collect(Collectors.toList());
+ }
+ if (inputObject instanceof Map) {
+ Map<Object, Object> objectMap = new HashMap<>();
+ ((Map<Object, Object>) inputObject).forEach((k, v) -> objectMap.put(k,
asPrimitiveJavaRow(v)));
+ return objectMap;
+ }
+ return inputObject;
+ }
+
+ /**
* ProjectMapFunction implements MapFunction to map input
SamzaSqlRelMessages, one at a time, to a new
* SamzaSqlRelMessage which consists of the projected fields
*/
@@ -94,6 +215,7 @@ class ProjectTranslator {
this.translatorContext =
((SamzaSqlApplicationContext)
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.project = (Project) this.translatorContext.getRelNode(projectId);
+ LOG.info("Compiling operator {} ", project.getDigest());
this.expr =
this.translatorContext.getExpressionCompiler().compile(project.getInputs(),
project.getProjects());
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -114,20 +236,19 @@ class ProjectTranslator {
long arrivalTime = System.nanoTime();
RelDataType type = project.getRowType();
Object[] output = new Object[type.getFieldCount()];
+ Object[] inputRow = convertToJavaRow(message.getSamzaSqlRelRecord());
+ SamzaSqlExecutionContext execContext =
translatorContext.getExecutionContext();
+ DataContext dataRootContext = translatorContext.getDataContext();
try {
- expr.execute(translatorContext.getExecutionContext(), context,
translatorContext.getDataContext(),
- message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+ expr.execute(execContext, context, dataRootContext, inputRow, output);
} catch (Exception e) {
String errMsg = String.format("Handling the following rel message ran
into an error. %s", message);
LOG.error(errMsg, e);
throw new SamzaException(errMsg, e);
}
- List<String> names = new ArrayList<>();
- for (int index = 0; index < output.length; index++) {
- names.add(index, project.getNamedProjects().get(index).getValue());
- }
+ SamzaSqlRelRecord record = buildSamzaRelRecord(output,
project.getRowType());
updateMetrics(arrivalTime, System.nanoTime(),
message.getSamzaSqlRelMsgMetadata().isNewInputMessage);
- return new SamzaSqlRelMessage(names, Arrays.asList(output),
message.getSamzaSqlRelMsgMetadata());
+ return new SamzaSqlRelMessage(record,
message.getSamzaSqlRelMsgMetadata());
}
/**
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
index 6a93b2d..8a60502 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
@@ -21,6 +21,8 @@ package org.apache.samza.sql.translator;
import java.util.List;
import java.util.Objects;
+import java.util.function.Function;
+import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
@@ -42,15 +44,35 @@ public class SamzaSqlRemoteTableJoinFunction
private transient SamzaRelTableKeyConverter relTableKeyConverter;
private final String tableName;
private final int queryId;
+ /**
+ * Projection and Filter Function to apply post the join lookup. Function
will return null in case filter rejects row.
+ */
+ private Function<SamzaSqlRelMessage, SamzaSqlRelMessage> projectFunction;
+ /**
+ * Projects and Filters operator queue.
+ */
+ private final MessageStreamCollector messageStreamCollector;
+
+ public SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter,
SamzaRelTableKeyConverter tableKeyConverter,
+ JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType
joinRelType, int queryId,
+ MessageStreamCollector messageStreamCollector) {
+ super(streamNode, tableNode, joinRelType);
+ this.msgConverter = msgConverter;
+ this.relTableKeyConverter = tableKeyConverter;
+ this.tableName = tableNode.getSourceName();
+ this.queryId = queryId;
+ this.messageStreamCollector = messageStreamCollector;
+ }
SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter,
SamzaRelTableKeyConverter tableKeyConverter,
JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType
joinRelType, int queryId) {
super(streamNode, tableNode, joinRelType);
-
this.msgConverter = msgConverter;
this.relTableKeyConverter = tableKeyConverter;
this.tableName = tableNode.getSourceName();
this.queryId = queryId;
+ this.projectFunction = Function.identity();
+ this.messageStreamCollector = null;
}
@Override
@@ -59,13 +81,24 @@ public class SamzaSqlRemoteTableJoinFunction
((SamzaSqlApplicationContext)
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.msgConverter = translatorContext.getMsgConverter(tableName);
this.relTableKeyConverter =
translatorContext.getTableKeyConverter(tableName);
+ if (messageStreamCollector != null) {
+ projectFunction = messageStreamCollector.getFunction(context);
+ }
}
+ /**
+ * Compute the projection and filter post join lookup.
+ *
+ * @param record input record as result of lookup
+ * @return the projected row or {@code null} if Row doesn't pass the filter
condition.
+ */
@Override
+ @Nullable
protected List<Object> getTableRelRecordFieldValues(KV record) {
// Using the message rel converter, convert message to sql rel message and
add to output values.
- SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
- return relMessage.getSamzaSqlRelRecord().getFieldValues();
+ final SamzaSqlRelMessage relMessage =
msgConverter.convertToRelMessage(record);
+ final SamzaSqlRelMessage result = projectFunction.apply(relMessage);
+ return result == null ? null :
result.getSamzaSqlRelRecord().getFieldValues();
}
@Override
@@ -76,6 +109,8 @@ public class SamzaSqlRemoteTableJoinFunction
return null;
}
// Using the table key converter, convert message key from rel format to
the record key format.
+ // TODO: On way to avoid the object type here is to ensure that:
+ // table's key is a SamzaRelRecord or any well defined type when defining
the table descriptor
return relTableKeyConverter.convertToTableKeyFormat(keyRecord);
}
@@ -83,4 +118,12 @@ public class SamzaSqlRemoteTableJoinFunction
public Object getRecordKey(KV record) {
return record.getKey();
}
+
+ @Override
+ public void close() {
+ super.close();
+ if (messageStreamCollector != null) {
+ messageStreamCollector.close();
+ }
+ }
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index cd7ecdc..e8fa451 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -21,6 +21,7 @@ package org.apache.samza.sql.translator;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.commons.lang3.Validate;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -48,6 +49,7 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
// Table field names are used in the outer join when the table record is not
found.
private final ArrayList<String> tableFieldNames;
private final ArrayList<String> outFieldNames;
+ final private List<Object> nullRow;
SamzaSqlTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode,
JoinRelType joinRelType) {
this.joinRelType = joinRelType;
@@ -69,6 +71,7 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
outFieldNames.addAll(tableFieldNames);
outFieldNames.addAll(streamNode.getFieldNames());
}
+ nullRow = tableFieldNames.stream().map(x ->
null).collect(Collectors.toList());
}
@Override
@@ -93,7 +96,10 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
// Add the table record fields.
if (record != null) {
- outFieldValues.addAll(getTableRelRecordFieldValues(record));
+ List<Object> row = getTableRelRecordFieldValues(record);
+ // null in case the filter did not match thus row has to be removed if
inner join or padded null case outer join
+ if (row == null && joinRelType.compareTo(JoinRelType.INNER) == 0) return
null;
+ outFieldValues.addAll(row == null ? nullRow : row);
} else {
// Table record could be null as the record could not be found in the
store. This can
// happen for outer joins. Add nulls to all the field values in the
output message.
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index f58140d..87c4e00 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -170,7 +170,10 @@ class ScanTranslator {
// SqlIOResolverFactory.
// For local table, even though table descriptor is already defined, we
still need to create the input stream
// descriptor to load the local table.
+ // To handle case where a project or filter is pushed to Remote table Scan
will collect the operators and feed it to the join operator.
+ // TODO In an ideal world this has to change and use Calcite Pattern
matching to translate the plan.
if (isRemoteTable) {
+ context.registerMessageStream(tableScan.getId(), new
MessageStreamCollector());
return;
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 5990897..7d85652 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.sql.data.RexToJavaCompiler;
@@ -52,7 +53,7 @@ public class TranslatorContext implements Cloneable {
private final RexToJavaCompiler compiler;
private final Map<String, SamzaRelConverter> relSamzaConverters;
private final Map<String, SamzaRelTableKeyConverter> relTableKeyConverters;
- private final Map<Integer, MessageStream> messageStreams;
+ private final Map<Integer, MessageStream<SamzaSqlRelMessage>> messageStreams;
private final Map<Integer, RelNode> relNodes;
private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
@@ -199,7 +200,7 @@ public class TranslatorContext implements Cloneable {
* @param id the id
* @return the message stream
*/
- MessageStream getMessageStream(int id) {
+ MessageStream<SamzaSqlRelMessage> getMessageStream(int id) {
return messageStreams.get(id);
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
b/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
new file mode 100644
index 0000000..87cf486
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a
dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for
https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing
udf that relies on legacy row format.
+ */
+public class GetNestedField extends SqlUserDefinedFunction {
+
+ public static final SqlFunction INSTANCE = new GetNestedField(new
ExtractFunction());
+
+ public GetNestedField(Function function) {
+ super(new SqlIdentifier("GetNestedField", SqlParserPos.ZERO), null, null,
null, ImmutableList.of(), function);
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(2);
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean
throwOnFailure) {
+ final SqlNode left = callBinding.operand(0);
+ final SqlNode right = callBinding.operand(1);
+ final RelDataType type =
callBinding.getValidator().deriveType(callBinding.getScope(), left);
+ boolean isRow = true;
+ if (type.getSqlTypeName() != SqlTypeName.ROW) {
+ isRow = false;
+ } else if (type.getSqlIdentifier().isStar()) {
+ isRow = false;
+ }
+ if (!isRow && throwOnFailure) {
+ throw callBinding.newValidationSignatureError();
+ }
+ return isRow && OperandTypes.STRING.checkSingleOperandType(callBinding,
right, 0, throwOnFailure);
+ }
+
+ @Override
+ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+ final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+ final RelDataType recordType = opBinding.getOperandType(0);
+ switch (recordType.getSqlTypeName()) {
+ case ROW:
+ final String fieldName = opBinding.getOperandLiteralValue(1,
String.class);
+ String[] fieldNameChain = fieldName.split("\\.");
+ RelDataType relDataType = opBinding.getOperandType(0);
+ for (int i = 0; i < fieldNameChain.length; i++) {
+ RelDataTypeField t = relDataType.getField(fieldNameChain[i], true,
true);
+ Preconditions.checkNotNull(t,
+ "Can not find " + fieldNameChain[i] + " within record " +
recordType.toString() + " Original String "
+ + Arrays.toString(fieldNameChain) + " Original row " +
recordType.toString());
+ relDataType = t.getType();
+ }
+ if (recordType.isNullable()) {
+ return typeFactory.createTypeWithNullability(relDataType, true);
+ } else {
+ return relDataType;
+ }
+ default:
+ throw new AssertionError("First Operand is suppose to be a Row
Struct");
+ }
+ }
+
+ private static class ExtractFunction implements ScalarFunction,
ImplementableFunction {
+ private final JavaTypeFactoryImpl javaTypeFactory = new
JavaTypeFactoryImpl();
+
+ @Override
+ public CallImplementor getImplementor() {
+ return RexImpTable.createImplementor((translator, call,
translatedOperands) -> {
+ Preconditions.checkState(translatedOperands.size() == 2 &&
call.operands.size() == 2,
+ "Expected 2 operands found " + Math.min(translatedOperands.size(),
call.getOperands().size()));
+ Expression op0 = translatedOperands.get(0);
+ Expression op1 = translatedOperands.get(1);
+
Preconditions.checkState(op1.getNodeType().equals(ExpressionType.Constant),
+ "Operand 2 has to be constant and got " + op1.getNodeType());
+ Preconditions.checkState(op1.type.equals(String.class), "Operand 2 has
to be String and got " + op1.type);
+ final String fieldName = (String) ((ConstantExpression) op1).value;
+ String[] fieldNameChain = fieldName.split("\\.");
+ RelDataType relDataType = call.operands.get(0).getType();
+
Preconditions.checkState(relDataType.getSqlTypeName().equals(SqlTypeName.ROW),
+ "Expected first operand to be ROW found " +
relDataType.toString());
+ Expression currentExpression = op0;
+ for (int i = 0; i < fieldNameChain.length; i++) {
+ Preconditions.checkState(relDataType.getSqlTypeName() ==
SqlTypeName.ROW,
+ "Must be ROW found " + relDataType.toString());
+ RelDataTypeField t = relDataType.getField(fieldNameChain[i], true,
true);
+ Preconditions.checkNotNull(t,
+ "Notfound " + fieldNameChain[i] + " in the following struct " +
relDataType.toString()
+ + " Original String " + Arrays.toString(fieldNameChain) + "
Original row " + call.operands.get(0)
+ .getType());
+ currentExpression =
Expressions.arrayIndex(Expressions.convert_(currentExpression, Object[].class),
+ Expressions.constant(t.getIndex()));
+ relDataType = t.getType();
+ }
+ Type fieldType = javaTypeFactory.getJavaClass(relDataType);
+ return EnumUtils.convert(currentExpression, fieldType);
+ }, NullPolicy.ARG0, false);
+ }
+
+ @Override
+ public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+ throw new IllegalStateException("should not be called");
+ }
+
+ @Override
+ public List<FunctionParameter> getParameters() {
+ return builder().add(Object[].class, "row").add(String.class,
"path").build();
+ }
+ }
+
+ @Override
+ public String getAllowedSignatures(String opNameToUse) {
+ return opNameToUse + "(<ROW>, <VARCHAR>)";
+ }
+}
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
index 2a9dc13..d234067 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
@@ -129,7 +129,7 @@ public class TestQueryPlanner {
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
- assertTrue(right instanceof LogicalTableScan);
+ assertTrue("Was instance of " + right.getClass(), right instanceof
LogicalProject);
if (enableOptimizer) {
assertTrue(left instanceof LogicalFilter);
assertEquals("=(1, $2)", ((LogicalFilter)
left).getCondition().toString());
@@ -187,9 +187,9 @@ public class TestQueryPlanner {
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
if (enableOptimizer) {
- assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter)
relNode).getCondition().toString());
+ assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter)
relNode).getCondition().toString());
} else {
- assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))",
((LogicalFilter) relNode).getCondition().toString());
+ assertEquals("AND(=(1, $11), =($2, $10), =($2, 'Mike'))",
((LogicalFilter) relNode).getCondition().toString());
}
relNode = relNode.getInput(0);
if (enableOptimizer) {
@@ -202,7 +202,7 @@ public class TestQueryPlanner {
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
- assertTrue(left instanceof LogicalTableScan);
+ assertTrue("was instance of " + left.getClass(), left instanceof
LogicalProject);
if (enableOptimizer) {
assertTrue(right instanceof LogicalFilter);
assertEquals("=(1, $2)", ((LogicalFilter)
right).getCondition().toString());
@@ -289,14 +289,14 @@ public class TestQueryPlanner {
assertTrue(relNode instanceof LogicalProject);
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalFilter);
- assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter)
relNode).getCondition().toString());
+ assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter)
relNode).getCondition().toString());
relNode = relNode.getInput(0);
assertTrue(relNode instanceof LogicalJoin);
assertEquals(2, relNode.getInputs().size());
LogicalJoin join = (LogicalJoin) relNode;
RelNode left = join.getLeft();
RelNode right = join.getRight();
- assertTrue(left instanceof LogicalTableScan);
+ assertTrue(left instanceof LogicalProject);
assertTrue(right instanceof LogicalFilter);
assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter)
right).getCondition().toString());
assertTrue(right.getInput(0) instanceof LogicalTableScan);
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 47355be..cc33e5f 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -258,6 +258,9 @@ public class TestAvroSystemFactory implements SystemFactory
{
record.put("address", createProfileAddressRecord(index));
record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ?
null : index % COMPANIES.length);
record.put("phoneNumbers", createProfilePhoneNumbers(index %
PHONE_NUMBERS.length));
+ Map<String, Object> mapValues = new HashMap<>();
+ mapValues.put("key", createSimpleRecord(index, false));
+ record.put("mapValues", mapValues);
return record;
}
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index dc193a2..33e775b 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.sql.translator;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,7 +29,10 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.context.ContainerContext;
@@ -68,6 +72,7 @@ import static org.mockito.Mockito.when;
@PrepareForTest(LogicalProject.class)
public class TestProjectTranslator extends TranslatorTestBase {
private static final String LOGICAL_OP_ID = "sql0_project_0";
+ private static final String TEST_FIELD = "test_field";
@Test
public void testTranslate() throws IOException, ClassNotFoundException {
@@ -86,12 +91,23 @@ public class TestProjectTranslator extends
TranslatorTestBase {
when(mockProject.getInputs()).thenReturn(inputs);
when(mockProject.getInput()).thenReturn(mockInput);
RelDataType mockRowType = mock(RelDataType.class);
+ List<RelDataTypeField> relFields = new ArrayList<>();
+ String fieldName = TEST_FIELD;
+ int fieldPos = 0;
+ RelDataType dataType = mock(RelDataType.class);
+ when(dataType.getSqlTypeName()).thenReturn(SqlTypeName.ANY);
+ relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
when(mockRowType.getFieldCount()).thenReturn(1);
when(mockProject.getRowType()).thenReturn(mockRowType);
+
when(mockProject.getRowType().getSqlTypeName()).thenReturn(SqlTypeName.ROW);
+ when(mockProject.getRowType().getFieldList()).thenReturn(relFields);
+ when(mockProject.getRowType().isStruct()).thenReturn(true);
RexNode mockRexField = mock(RexNode.class);
List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
- namedProjects.add(Pair.of(mockRexField, "test_field"));
+ namedProjects.add(Pair.of(mockRexField, TEST_FIELD));
when(mockProject.getNamedProjects()).thenReturn(namedProjects);
+ when(mockProject.getRowType()).thenReturn(mockRowType);
+
when(mockProject.getRowType().getFieldNames()).thenReturn(ImmutableList.of(TEST_FIELD));
StreamApplicationDescriptorImpl mockAppDesc =
mock(StreamApplicationDescriptorImpl.class);
OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp =
mock(OperatorSpec.class);
MessageStream<SamzaSqlRelMessage> mockStream = new
MessageStreamImpl<>(mockAppDesc, mockInputOp);
@@ -152,7 +168,7 @@ public class TestProjectTranslator extends
TranslatorTestBase {
}).when(mockExpr).execute(eq(executionContext), eq(mockContext),
eq(dataContext),
eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()),
eq(result));
SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
- assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
Collections.singletonList("test_field"));
+ assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
Collections.singletonList(TEST_FIELD));
assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(),
Collections.singletonList(mockFieldObj));
// Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 07c4ce2..931036e 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -313,30 +313,6 @@ public class TestQueryTranslator {
}
@Test (expected = SamzaException.class)
- public void testTranslateStreamTableJoinWithoutJoinOperator() {
- Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
- String sql =
- "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
- + " select p.name as profileName, pv.pageKey"
- + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
- + " where p.id = pv.profileId";
- config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
- Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true,
new MapConfig(config));
-
- List<String> sqlStmts = fetchSqlFromConfig(config);
- List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
- SamzaSqlApplicationConfig samzaSqlApplicationConfig = new
SamzaSqlApplicationConfig(new MapConfig(config),
-
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toList()),
-
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
-
- StreamApplicationDescriptorImpl streamAppDesc = new
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
- QueryTranslator translator = new QueryTranslator(streamAppDesc,
samzaSqlApplicationConfig);
-
- translator.translate(queryInfo.get(0), streamAppDesc, 0);
- }
-
- @Test (expected = SamzaException.class)
public void testTranslateStreamTableJoinWithFullJoinOperator() {
Map<String, String> config =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
String sql =
@@ -656,9 +632,9 @@ public class TestQueryTranslator {
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
-
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2",
output1PhysicalName);
+
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3",
output1PhysicalName);
Assert.assertEquals("kafka", output2System);
-
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2",
output2PhysicalName);
+
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3",
output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -668,9 +644,9 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
-
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2",
input3PhysicalName);
+
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3",
input3PhysicalName);
Assert.assertEquals("kafka", input4System);
-
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2",
input4PhysicalName);
+
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3",
input4PhysicalName);
}
@Test
@@ -724,9 +700,9 @@ public class TestQueryTranslator {
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2",
output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3",
output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2",
output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3",
output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -736,9 +712,9 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2",
input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3",
input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2",
input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3",
input4PhysicalName);
}
@Test
@@ -791,9 +767,9 @@ public class TestQueryTranslator {
Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2",
output1PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3",
output1PhysicalName);
Assert.assertEquals("kafka", output2System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2",
output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3",
output2PhysicalName);
Assert.assertEquals("testavro", output3System);
Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
@@ -803,9 +779,9 @@ public class TestQueryTranslator {
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PAGEVIEW", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2",
input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3",
input3PhysicalName);
Assert.assertEquals("kafka", input4System);
- Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2",
input4PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3",
input4PhysicalName);
}
@Test
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index b622e75..ca78af2 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -499,7 +499,6 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
Assert.assertEquals(IntStream.range(0,
numMessages).boxed().collect(Collectors.toList()), outMessages);
}
- @Ignore
@Test
public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {
int numMessages = 10;
@@ -507,9 +506,12 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
- "Insert into testavro.outputTopic"
- + " select `phoneNumbers`[0].`kind`"
- + " from testavro.PROFILE as p";
+ "Insert into testavro.outputTopic (id, bool_value)"
+ // SQL array is one indexed.
+ + " select `phoneNumbers`[1].`kind` as string_value,
p.address.streetnum.number as id, "
+ + " `phoneNumbers`[1].`kind` = 'Home' as bool_value,
cast(p.address.zip as bigint) as long_value"
+ + " from testavro.PROFILE as p where p.address.zip > 0 and
p.address.zip < 100003 ";
+
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -523,6 +525,72 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
Assert.assertEquals(numMessages, outMessages.size());
}
+ /**
+ * Testing the getNestedField built in operator
+ * @throws SamzaSqlValidatorException
+ */
+ @Test
+ public void testEndToEndGetNestedFieldOperator() throws
SamzaSqlValidatorException {
+ int numMessages = 10;
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+ String sql1 =
+ "Insert into testavro.outputTopic (string_value, id, bool_value,
double_value, map_values, long_value)"
+ + " select GetNestedField(address, 'streetnum.number') *
getNestedField(mapValues['key'], 'id') as id, "
+ + " cast(GetNestedField(address, 'streetnum').number * 1.0 as
double) as double_value, mapValues as map_values, "
+ + " GetNestedField(phoneNumbers[1] ,'kind') = 'Home' as
bool_value, cast( mapValues['key'].id as bigint) as long_value , "
+ + " GetNestedField(mapValues['key'], 'name') as string_value "
+ + " from testavro.PROFILE as p where GetNestedField(address,
'zip') > 0 and GetNestedField(address, 'zip') < 100003";
+ List<String> sqlStmts = Collections.singletonList(sql1);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+ runApplication(config);
+
+ List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
+ // check that the projected values are not null, correct types and good
values when easy to check.
+ List<GenericRecord> actualResult = outMessages.stream()
+ .map(x -> (GenericRecord) x.getMessage())
+ .filter(x -> (Boolean) x.get("bool_value"))
+ .filter(x -> x.get("string_value") != null &&
!x.get("string_value").toString().isEmpty())
+ .filter(x -> x.get("map_values") instanceof Map)
+ .filter(x -> x.get("id") instanceof Integer)
+ .filter(x -> (Long) x.get("long_value") < 10 && (Long)
x.get("long_value") >= 0)
+ .filter(x -> x.get("double_value") instanceof Double && (Double)
x.get("double_value") >= 1234.0)
+ .collect(Collectors.toList());
+ Assert.assertEquals(
+ "Wrong results size, check the test condition against the Actual
outputs -> " + outMessages.toString(),
+ numMessages, actualResult.size());
+ }
+
+
+ @Test
+ public void testEndToEndNestedRecordProjectFilter() throws
SamzaSqlValidatorException {
+ int numMessages = 10;
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+ String sql1 = " Insert into testavro.PROFILE1 select
(p.address.streetnum.number * p.address.zip) as id , "
+ + " p.address, `phoneNumbers`[1].`kind` = 'Home' as selfEmployed, "
+ + " MAP[cast(id as varchar), `phoneNumbers`[1].number] as mapValues,
phoneNumbers, "
+ + " cast(companyId as varchar) || name ||`phoneNumbers`[1].number ||
'concat' as name , "
+ + " 100 * ((companyId + 122) / 3 ) as companyId "
+ + " from testavro.PROFILE as p where p.address.zip > 0 "
+ + " and p.address.zip < 100003 and p.address.streetnum.number > 0 ";
+
+ List<String> sqlStmts = Collections.singletonList(sql1);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
+ Assert.assertEquals(numMessages, outMessages.size());
+ }
+
@Test
public void testEndToEndFlattenWithUdf() throws Exception {
int numMessages = 20;
@@ -647,7 +715,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.PROFILE1(id, address) "
- + "select id, BuildOutputRecord('key', GetNestedField(address, 'zip'))
as address from testavro.PROFILE";
+ + "select id, BuildOutputRecord('key', p.address.zip) as address from
testavro.PROFILE as p";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));
@@ -724,7 +792,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
+ " p.name as profileName, p.address as profileAddress "
+ "from testavro.PROFILE.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
- + " on p.id = pv.profileId";
+ + " on p.id = pv.profileId where p.name = 'Mike' or p.name is not
null";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -745,7 +813,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndStreamTableInnerJoinWithPrimaryKey() throws
Exception {
+ public void testEndToEndStreamTableInnerJoinWithPrimaryKey() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index c41038d..bd541c6 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -138,7 +138,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
- + " on p.__key__ = pv.profileId";
+ + " on p.__key__ = pv.profileId where p.name is not null";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -336,7 +336,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
+ " p.name as profileName, p.address as profileAddress "
+ "from testRemoteStore.Profile.`$table` as p "
+ "right join testavro.PAGEVIEW as pv "
- + " on p.__key__ = pv.profileId";
+ + " on p.__key__ = pv.profileId where p.name is null or p.name <>
'0'";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -400,6 +400,82 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
@Test
+ public void testSourceEndToEndWithFilterAndInnerJoin() throws
SamzaSqlValidatorException {
+ int numMessages = 20;
+ TestAvroSystemFactory.messages.clear();
+ RemoteStoreIOResolverTestFactory.records.clear();
+ Map<String, String> staticConfigs =
+ SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(),
numMessages, true);
+ populateProfileTable(staticConfigs, numMessages);
+
+ String sql = "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null,
'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testavro.PAGEVIEW as pv "
+ + "join testRemoteStore.Profile.`$table` as p "
+ + " on p.__key__ = pv.profileId"
+ + " where p.name <> 'Mike' ";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
"," + (
+ ((GenericRecord) x.getMessage()).get("profileName") == null ?
"null"
+ : ((GenericRecord)
x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ List<String> expectedOutMessages =
TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages)
+ .stream()
+ .filter(x -> !x.contains("Mike"))
+ .collect(Collectors.toList());
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ @Test
+ public void testSourceEndToEndWithFilterAndLeftOuterJoin() throws
SamzaSqlValidatorException {
+ int numMessages = 20;
+ TestAvroSystemFactory.messages.clear();
+ RemoteStoreIOResolverTestFactory.records.clear();
+ Map<String, String> staticConfigs =
+ SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(),
numMessages, true);
+ populateProfileTable(staticConfigs, numMessages);
+
+ String sql = "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null,
'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testavro.PAGEVIEW as pv "
+ + " LEFT Join testRemoteStore.Profile.`$table` as p "
+ + " on pv.profileId + 1 - (2/2) = p.__key__ "
+ + " where p.name <> 'Mary' or p.name is null";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
"," + (
+ ((GenericRecord) x.getMessage()).get("profileName") == null ?
"null"
+ : ((GenericRecord)
x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ List<String> expectedOutMessages =
+
TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages)
+ .stream()
+ .filter(x -> !x.contains("Mary"))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ @Test
public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws
SamzaSqlValidatorException {
int numMessages = 21;