This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new fff0bb0 SAMZA-2507: Bumping calciteVersion 1.19.0-->1.22.0. (#1341)
fff0bb0 is described below
commit fff0bb072f3a118eb8cf71257f54ef3aa8eedbd6
Author: Aditya Toomula <[email protected]>
AuthorDate: Wed Apr 8 21:23:16 2020 -0700
SAMZA-2507: Bumping calciteVersion 1.19.0-->1.22.0. (#1341)
* SAMZA-2507: Bumping calciteVersion 1.19.0-->1.22.0. This has fixes for
subqueries with joins and first class support for case insensitive Udfs.
---
build.gradle | 1 +
gradle/dependency-versions.gradle | 2 +-
.../apache/samza/sql/data/RexToJavaCompiler.java | 1 -
.../apache/samza/sql/data/SamzaSqlRelMessage.java | 2 +-
.../apache/samza/sql/fn/BuildOutputRecordUdf.java | 2 +-
.../samza/sql/impl/ConfigBasedUdfResolver.java | 2 +-
.../apache/samza/sql/interfaces/SqlIOConfig.java | 2 +-
.../apache/samza/sql/interfaces/UdfMetadata.java | 8 ++---
.../org/apache/samza/sql/planner/QueryPlanner.java | 6 +++-
.../sql/planner/SamzaSqlUdfOperatorTable.java | 10 ++----
.../samza/sql/planner/SamzaSqlValidator.java | 4 +--
.../sql/runner/SamzaSqlApplicationConfig.java | 4 +--
.../samza/sql/translator/JoinTranslator.java | 7 ++--
.../samza/sql/translator/QueryTranslator.java | 2 +-
.../sql/translator/SamzaSqlTableJoinFunction.java | 2 +-
.../samza/sql/translator/ScanTranslator.java | 3 +-
.../java/org/apache/samza/sql/util/JsonUtil.java | 2 +-
.../org/apache/samza/sql/util/SqlFileParser.java | 4 +--
.../sql/runner/TestSamzaSqlApplicationConfig.java | 2 +-
.../sql/system/ConsoleLoggingSystemFactory.java | 4 +--
.../samza/sql/translator/TestJoinTranslator.java | 6 ++--
.../udf/impl/TestReflectionBasedUdfResolver.java | 2 +-
.../apache/samza/test/framework/TestRunner.java | 2 +-
.../test/performance/TestKeyValuePerformance.scala | 2 +-
.../harness/InMemoryIntegrationTestHarness.java | 2 +-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 39 ++++++++++++++++++++--
.../samza/tools/ConsoleLoggingSystemFactory.java | 4 +--
.../apache/samza/tools/avro/AvroSerDeFactory.java | 2 +-
.../samza/tools/json/JsonRelConverterFactory.java | 4 +--
29 files changed, 81 insertions(+), 52 deletions(-)
diff --git a/build.gradle b/build.gradle
index 47e134e..94d353b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -308,6 +308,7 @@ project(":samza-sql_$scalaSuffix") {
compile project(":samza-kv-rocksdb_$scalaSuffix")
compile "org.apache.avro:avro:$avroVersion"
compile "org.apache.calcite:calcite-core:$calciteVersion"
+ compile "org.codehaus.janino:commons-compiler:2.7.6"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "org.reflections:reflections:0.9.10"
diff --git a/gradle/dependency-versions.gradle
b/gradle/dependency-versions.gradle
index dfe6978..061f0b7 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -19,7 +19,7 @@
ext {
apacheCommonsCollections4Version = "4.0"
avroVersion = "1.7.7"
- calciteVersion = "1.19.0"
+ calciteVersion = "1.22.0"
commonsCliVersion = "1.2"
commonsCodecVersion = "1.9"
commonsCollectionVersion = "3.2.1"
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
index bc881d3..098d1a3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -46,7 +46,6 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.util.Pair;
import org.apache.samza.SamzaException;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 791d79c..84c3be8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.sql.SamzaSqlRelRecord;
import org.codehaus.jackson.annotate.JsonProperty;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
b/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
index 7f9de1a..3c427cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
@@ -21,7 +21,7 @@ package org.apache.samza.sql.fn;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.sql.SamzaSqlRelRecord;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
index 2b83b60..2b9381d 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.sql.interfaces.UdfMetadata;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 4350889..9d8201c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Optional;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
index e3c5d60..9e15b21 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
@@ -35,7 +35,6 @@ public class UdfMetadata {
// retains the name as it is given to UdfMetadata.
// For example: if displayName is 'GetSqlField', name would be 'GETSQLFIELD'.
private final String name;
- private final String displayName;
private final String description;
private final Method udfMethod;
@@ -47,10 +46,7 @@ public class UdfMetadata {
public UdfMetadata(String name, String description, Method udfMethod, Config
udfConfig, List<SamzaSqlFieldType> arguments,
SamzaSqlFieldType returnType, boolean disableArgCheck) {
- // Udfs are case insensitive
- this.name = name.toUpperCase();
- // Let's also store the original name for display purposes.
- this.displayName = name;
+ this.name = name;
this.description = description;
this.udfMethod = udfMethod;
this.udfConfig = udfConfig;
@@ -88,7 +84,7 @@ public class UdfMetadata {
* @return Returns the name of the Udf for display purposes.
*/
public String getDisplayName() {
- return displayName;
+ return getName();
}
/**
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index c09777d..8990bf1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -131,7 +131,11 @@ public class QueryPlanner {
// Using lenient so that !=,%,- are allowed.
FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-
.parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).setConformance(SqlConformanceEnum.LENIENT).build())
+ .parserConfig(SqlParser.configBuilder()
+ .setLex(Lex.JAVA)
+ .setConformance(SqlConformanceEnum.LENIENT)
+ .setCaseSensitive(false) // Make Udfs case insensitive
+ .build())
.defaultSchema(rootSchema)
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
.sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
index eb215c8..c56cf17 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.util.ListSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.samza.sql.interfaces.UdfMetadata;
@@ -71,13 +72,8 @@ public class SamzaSqlUdfOperatorTable implements
SqlOperatorTable {
@Override
public void lookupOperatorOverloads(SqlIdentifier opName,
SqlFunctionCategory category, SqlSyntax syntax,
- List<SqlOperator> operatorList) {
- SqlIdentifier upperCaseOpName = opName;
- // Only udfs are case insensitive
- if (category != null &&
category.equals(SqlFunctionCategory.USER_DEFINED_FUNCTION)) {
- upperCaseOpName = new SqlIdentifier(opName.names.get(0).toUpperCase(),
opName.getComponentParserPosition(0));
- }
- operatorTable.lookupOperatorOverloads(upperCaseOpName, category, syntax,
operatorList);
+ List<SqlOperator> operatorList, SqlNameMatcher nameMatcher) {
+ operatorTable.lookupOperatorOverloads(opName, category, syntax,
operatorList, nameMatcher);
}
@Override
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index d0c51a1..760da6d 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -34,8 +34,8 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 3d4047e..8dc6821 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -35,8 +35,8 @@ import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.TableModify;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 635d0ba..ad126e2 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
@@ -39,7 +38,7 @@ import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -328,10 +327,10 @@ class JoinTranslator {
// NOTE: Any intermediate form of a join is always a stream. Eg: For the
second level join of
// stream-table-table join, the left side of the join is join output,
which we always
- // assume to be a stream. The intermediate stream won't be an instance of
EnumerableTableScan.
+ // assume to be a stream. The intermediate stream won't be an instance of
TableScan.
// The join key(s) for the table could be an udf in which case the relNode
would be LogicalProject.
- if (relNode instanceof EnumerableTableScan || relNode instanceof
LogicalProject) {
+ if (relNode instanceof TableScan || relNode instanceof LogicalProject) {
SqlIOConfig sourceTableConfig = resolveSQlIOForTable(relNode, context);
if (sourceTableConfig == null ||
!sourceTableConfig.getTableDescriptor().isPresent()) {
return JoinInputNode.InputType.STREAM;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index fc16326..8247921 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -32,7 +32,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.ApplicationContainerContext;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index 0715af3..cd7ecdc 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.translator;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.sql.SamzaSqlRelRecord;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index aefa04b..f58140d 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.translator;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.core.TableScan;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.ContainerContext;
@@ -37,7 +37,6 @@ import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.SamzaSqlInputMessage;
import org.apache.samza.sql.SamzaSqlInputTransformer;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
index afd6490..47c761a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.util;
import java.io.IOException;
import java.io.StringWriter;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
index f996987..d68eba1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
@@ -26,8 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index 725da90..3e31e70 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -168,7 +168,7 @@ public class TestSamzaSqlApplicationConfig {
.collect(Collectors.toList()),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
Assert.fail();
- } catch (IllegalArgumentException e) {
+ } catch (NullPointerException e) {
// swallow
}
}
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
b/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
index aa5ac1b..9d9ac0c 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
@@ -19,7 +19,7 @@
package org.apache.samza.sql.system;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -41,7 +41,7 @@ public class ConsoleLoggingSystemFactory implements
SystemFactory {
@Override
public SystemConsumer getConsumer(String systemName, Config config,
MetricsRegistry registry) {
- throw new NotImplementedException();
+ throw new NotImplementedException("Not Implemented");
}
@Override
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index e8f2d1f..b8fe7c2 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -24,11 +24,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@@ -79,7 +79,7 @@ import static org.mockito.Mockito.when;
* Tests for {@link JoinTranslator}
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({LogicalJoin.class, EnumerableTableScan.class})
+@PrepareForTest({LogicalJoin.class, LogicalTableScan.class})
public class TestJoinTranslator extends TranslatorTestBase {
@Test
@@ -98,7 +98,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
final int queryId = 0;
LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
- RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
+ RelNode mockLeftInput = PowerMockito.mock(LogicalTableScan.class);
RelNode mockRightInput = mock(RelNode.class);
List<RelNode> inputs = new ArrayList<>();
inputs.add(mockLeftInput);
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
index d4c96af..af2259c 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
@@ -50,7 +50,7 @@ public class TestReflectionBasedUdfResolver {
Collection<UdfMetadata> udfMetadataList =
reflectionBasedUdfResolver.getUdfs();
Method method = TestSamzaSqlUdf.class.getMethod("execute", String.class);
- UdfMetadata udfMetadata = new UdfMetadata("TESTSAMZASQLUDF",
+ UdfMetadata udfMetadata = new UdfMetadata("TestSamzaSqlUdf",
"Test samza sql udf implementation", method, new MapConfig(),
ImmutableList.of(SamzaSqlFieldType.STRING),
SamzaSqlFieldType.STRING, true);
diff --git
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index bca168e..e2db379 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
diff --git
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 9eaed56..46f345d 100644
---
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -28,7 +28,7 @@ import java.util.UUID
import com.google.common.base.Stopwatch
import com.google.common.collect.ImmutableList
import com.google.common.collect.ImmutableMap
-import org.apache.commons.lang.RandomStringUtils
+import org.apache.commons.lang3.RandomStringUtils
import org.apache.samza.config.{Config, JobConfig, MapConfig, StorageConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.context.ContainerContextImpl
diff --git
a/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
index f120ac3..0c1f85f 100644
---
a/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
+++
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
@@ -21,7 +21,7 @@ package org.apache.samza.test.harness;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.InMemorySystemConfig;
import org.apache.samza.config.JobConfig;
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index b6b8a96..b70eb2c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -1052,7 +1052,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndStreamTableTableJoinWithPrimaryKeys() throws
Exception {
+ public void testEndToEndStreamTableNestedJoinWithPrimaryKeys() throws
Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -1086,7 +1086,42 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
- public void testEndToEndStreamTableTableJoinWithCompositeKey() throws
Exception {
+ public void testEndToEndStreamTableNestedJoinWithSubQuery() throws Exception
{
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select t.pageKey as __key__, t.pageKey as pageKey, c.name as
companyName, t.profileName as profileName,"
+ + " address as profileAddress "
+ + "from (select p.companyId as companyId, p.name as profileName,
p.address as address, pv.pageKey as pageKey"
+ + " from testavro.PAGEVIEW as pv "
+ + " join testavro.PROFILE.`$table` as p "
+ + " on MyTest(p.__key__) = MyTest(pv.profileId)) as t "
+ + "join testavro.COMPANY.`$table` as c "
+ + "on MyTest(t.companyId) = MyTest(c.__key__)";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() +
","
+ + ((GenericRecord) x.getMessage()).get("profileName").toString() +
","
+ + ((GenericRecord) x.getMessage()).get("companyName").toString())
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ List<String> expectedOutMessages =
TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages);
+ Assert.assertEquals(expectedOutMessages, outMessages);
+ }
+
+ @Test
+ public void testEndToEndStreamTableNestedJoinWithCompositeKey() throws
Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
diff --git
a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
index 4f18cd8..d544e12 100644
---
a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
+++
b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
@@ -26,7 +26,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
@@ -55,7 +55,7 @@ public class ConsoleLoggingSystemFactory implements
SystemFactory {
@Override
public SystemConsumer getConsumer(String systemName, Config config,
MetricsRegistry registry) {
- throw new NotImplementedException();
+ throw new NotImplementedException("Not Implemented");
}
@Override
diff --git
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
index a052306..27ebfed 100644
---
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
+++
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
@@ -31,7 +31,7 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.Serde;
diff --git
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
index 4db066a..f2c3769 100644
---
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
+++
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -21,7 +21,7 @@ package org.apache.samza.tools.json;
import java.io.IOException;
import java.util.List;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
@@ -51,7 +51,7 @@ public class JsonRelConverterFactory implements
SamzaRelConverterFactory {
@Override
public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> kv) {
- throw new NotImplementedException();
+ throw new NotImplementedException("Not implemented.");
}
@Override