This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new fff0bb0  SAMZA-2507: Bumping calciteVersion 1.19.0-->1.22.0. (#1341)
fff0bb0 is described below

commit fff0bb072f3a118eb8cf71257f54ef3aa8eedbd6
Author: Aditya Toomula <[email protected]>
AuthorDate: Wed Apr 8 21:23:16 2020 -0700

    SAMZA-2507: Bumping calciteVersion 1.19.0-->1.22.0. (#1341)
    
    * SAMZA-2507: Bumping calciteVersion 1.19.0-->1.22.0. This has fixes for 
subqueries with joins and first class support for case insensitive Udfs.
---
 build.gradle                                       |  1 +
 gradle/dependency-versions.gradle                  |  2 +-
 .../apache/samza/sql/data/RexToJavaCompiler.java   |  1 -
 .../apache/samza/sql/data/SamzaSqlRelMessage.java  |  2 +-
 .../apache/samza/sql/fn/BuildOutputRecordUdf.java  |  2 +-
 .../samza/sql/impl/ConfigBasedUdfResolver.java     |  2 +-
 .../apache/samza/sql/interfaces/SqlIOConfig.java   |  2 +-
 .../apache/samza/sql/interfaces/UdfMetadata.java   |  8 ++---
 .../org/apache/samza/sql/planner/QueryPlanner.java |  6 +++-
 .../sql/planner/SamzaSqlUdfOperatorTable.java      | 10 ++----
 .../samza/sql/planner/SamzaSqlValidator.java       |  4 +--
 .../sql/runner/SamzaSqlApplicationConfig.java      |  4 +--
 .../samza/sql/translator/JoinTranslator.java       |  7 ++--
 .../samza/sql/translator/QueryTranslator.java      |  2 +-
 .../sql/translator/SamzaSqlTableJoinFunction.java  |  2 +-
 .../samza/sql/translator/ScanTranslator.java       |  3 +-
 .../java/org/apache/samza/sql/util/JsonUtil.java   |  2 +-
 .../org/apache/samza/sql/util/SqlFileParser.java   |  4 +--
 .../sql/runner/TestSamzaSqlApplicationConfig.java  |  2 +-
 .../sql/system/ConsoleLoggingSystemFactory.java    |  4 +--
 .../samza/sql/translator/TestJoinTranslator.java   |  6 ++--
 .../udf/impl/TestReflectionBasedUdfResolver.java   |  2 +-
 .../apache/samza/test/framework/TestRunner.java    |  2 +-
 .../test/performance/TestKeyValuePerformance.scala |  2 +-
 .../harness/InMemoryIntegrationTestHarness.java    |  2 +-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 39 ++++++++++++++++++++--
 .../samza/tools/ConsoleLoggingSystemFactory.java   |  4 +--
 .../apache/samza/tools/avro/AvroSerDeFactory.java  |  2 +-
 .../samza/tools/json/JsonRelConverterFactory.java  |  4 +--
 29 files changed, 81 insertions(+), 52 deletions(-)

diff --git a/build.gradle b/build.gradle
index 47e134e..94d353b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -308,6 +308,7 @@ project(":samza-sql_$scalaSuffix") {
     compile project(":samza-kv-rocksdb_$scalaSuffix")
     compile "org.apache.avro:avro:$avroVersion"
     compile "org.apache.calcite:calcite-core:$calciteVersion"
+    compile "org.codehaus.janino:commons-compiler:2.7.6"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "org.reflections:reflections:0.9.10"
 
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index dfe6978..061f0b7 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -19,7 +19,7 @@
  ext {
   apacheCommonsCollections4Version = "4.0"
   avroVersion = "1.7.7"
-  calciteVersion = "1.19.0"
+  calciteVersion = "1.22.0"
   commonsCliVersion = "1.2"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
index bc881d3..098d1a3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -46,7 +46,6 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.util.Pair;
 import org.apache.samza.SamzaException;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 791d79c..84c3be8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.codehaus.jackson.annotate.JsonProperty;
 
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java 
b/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
index 7f9de1a..3c427cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
@@ -21,7 +21,7 @@ package org.apache.samza.sql.fn;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.context.Context;
 import org.apache.samza.sql.SamzaSqlRelRecord;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
index 2b83b60..2b9381d 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedUdfResolver.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.interfaces.UdfMetadata;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 4350889..9d8201c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java 
b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
index e3c5d60..9e15b21 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/UdfMetadata.java
@@ -35,7 +35,6 @@ public class UdfMetadata {
   // retains the name as it is given to UdfMetadata.
   // For example: if displayName is 'GetSqlField', name would be 'GETSQLFIELD'.
   private final String name;
-  private final String displayName;
 
   private final String description;
   private final Method udfMethod;
@@ -47,10 +46,7 @@ public class UdfMetadata {
 
   public UdfMetadata(String name, String description, Method udfMethod, Config 
udfConfig, List<SamzaSqlFieldType> arguments,
       SamzaSqlFieldType returnType, boolean disableArgCheck) {
-    // Udfs are case insensitive
-    this.name = name.toUpperCase();
-    // Let's also store the original name for display purposes.
-    this.displayName = name;
+    this.name = name;
     this.description = description;
     this.udfMethod = udfMethod;
     this.udfConfig = udfConfig;
@@ -88,7 +84,7 @@ public class UdfMetadata {
    * @return Returns the name of the Udf for display purposes.
    */
   public String getDisplayName() {
-    return displayName;
+    return getName();
   }
 
   /**
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index c09777d..8990bf1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -131,7 +131,11 @@ public class QueryPlanner {
 
       // Using lenient so that !=,%,- are allowed.
       FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          
.parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).setConformance(SqlConformanceEnum.LENIENT).build())
+          .parserConfig(SqlParser.configBuilder()
+              .setLex(Lex.JAVA)
+              .setConformance(SqlConformanceEnum.LENIENT)
+              .setCaseSensitive(false) // Make Udfs case insensitive
+              .build())
           .defaultSchema(rootSchema)
           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
index eb215c8..c56cf17 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlUdfOperatorTable.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.util.ListSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 
@@ -71,13 +72,8 @@ public class SamzaSqlUdfOperatorTable implements 
SqlOperatorTable {
 
   @Override
   public void lookupOperatorOverloads(SqlIdentifier opName, 
SqlFunctionCategory category, SqlSyntax syntax,
-      List<SqlOperator> operatorList) {
-    SqlIdentifier upperCaseOpName = opName;
-    // Only udfs are case insensitive
-    if (category != null && 
category.equals(SqlFunctionCategory.USER_DEFINED_FUNCTION)) {
-      upperCaseOpName = new SqlIdentifier(opName.names.get(0).toUpperCase(), 
opName.getComponentParserPosition(0));
-    }
-    operatorTable.lookupOperatorOverloads(upperCaseOpName, category, syntax, 
operatorList);
+      List<SqlOperator> operatorList, SqlNameMatcher nameMatcher) {
+    operatorTable.lookupOperatorOverloads(opName, category, syntax, 
operatorList, nameMatcher);
   }
 
   @Override
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index d0c51a1..760da6d 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -34,8 +34,8 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 3d4047e..8dc6821 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -35,8 +35,8 @@ import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.core.TableModify;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 635d0ba..ad126e2 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -39,7 +38,7 @@ import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -328,10 +327,10 @@ class JoinTranslator {
 
     // NOTE: Any intermediate form of a join is always a stream. Eg: For the 
second level join of
     // stream-table-table join, the left side of the join is join output, 
which we always
-    // assume to be a stream. The intermediate stream won't be an instance of 
EnumerableTableScan.
+    // assume to be a stream. The intermediate stream won't be an instance of 
TableScan.
     // The join key(s) for the table could be an udf in which case the relNode 
would be LogicalProject.
 
-    if (relNode instanceof EnumerableTableScan || relNode instanceof 
LogicalProject) {
+    if (relNode instanceof TableScan || relNode instanceof LogicalProject) {
       SqlIOConfig sourceTableConfig = resolveSQlIOForTable(relNode, context);
       if (sourceTableConfig == null || 
!sourceTableConfig.getTableDescriptor().isPresent()) {
         return JoinInputNode.InputType.STREAM;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index fc16326..8247921 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -32,7 +32,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.ApplicationContainerContext;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index 0715af3..cd7ecdc 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.translator;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index aefa04b..f58140d 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.translator;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.ContainerContext;
@@ -37,7 +37,6 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.SamzaSqlInputMessage;
 import org.apache.samza.sql.SamzaSqlInputTransformer;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java 
b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
index afd6490..47c761a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/JsonUtil.java
@@ -22,7 +22,7 @@ package org.apache.samza.sql.util;
 import java.io.IOException;
 import java.io.StringWriter;
 
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java 
b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
index f996987..d68eba1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java
@@ -26,8 +26,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index 725da90..3e31e70 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -168,7 +168,7 @@ public class TestSamzaSqlApplicationConfig {
               .collect(Collectors.toList()),
           
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
       Assert.fail();
-    } catch (IllegalArgumentException e) {
+    } catch (NullPointerException e) {
       // swallow
     }
   }
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
index aa5ac1b..9d9ac0c 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/system/ConsoleLoggingSystemFactory.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.sql.system;
 
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -41,7 +41,7 @@ public class ConsoleLoggingSystemFactory implements 
SystemFactory {
 
   @Override
   public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
-    throw new NotImplementedException();
+    throw new NotImplementedException("Not Implemented");
   }
 
   @Override
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index e8f2d1f..b8fe7c2 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -24,11 +24,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -79,7 +79,7 @@ import static org.mockito.Mockito.when;
  * Tests for {@link JoinTranslator}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({LogicalJoin.class, EnumerableTableScan.class})
+@PrepareForTest({LogicalJoin.class, LogicalTableScan.class})
 public class TestJoinTranslator extends TranslatorTestBase {
 
   @Test
@@ -98,7 +98,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     final int queryId = 0;
     LogicalJoin mockJoin = PowerMockito.mock(LogicalJoin.class);
     TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
-    RelNode mockLeftInput = PowerMockito.mock(EnumerableTableScan.class);
+    RelNode mockLeftInput = PowerMockito.mock(LogicalTableScan.class);
     RelNode mockRightInput = mock(RelNode.class);
     List<RelNode> inputs = new ArrayList<>();
     inputs.add(mockLeftInput);
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
index d4c96af..af2259c 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/udf/impl/TestReflectionBasedUdfResolver.java
@@ -50,7 +50,7 @@ public class TestReflectionBasedUdfResolver {
     Collection<UdfMetadata> udfMetadataList = 
reflectionBasedUdfResolver.getUdfs();
 
     Method method = TestSamzaSqlUdf.class.getMethod("execute", String.class);
-    UdfMetadata udfMetadata = new UdfMetadata("TESTSAMZASQLUDF",
+    UdfMetadata udfMetadata = new UdfMetadata("TestSamzaSqlUdf",
             "Test samza sql udf implementation", method, new MapConfig(), 
ImmutableList.of(SamzaSqlFieldType.STRING),
                SamzaSqlFieldType.STRING, true);
 
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index bca168e..e2db379 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.application.SamzaApplication;
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 9eaed56..46f345d 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -28,7 +28,7 @@ import java.util.UUID
 import com.google.common.base.Stopwatch
 import com.google.common.collect.ImmutableList
 import com.google.common.collect.ImmutableMap
-import org.apache.commons.lang.RandomStringUtils
+import org.apache.commons.lang3.RandomStringUtils
 import org.apache.samza.config.{Config, JobConfig, MapConfig, StorageConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.context.ContainerContextImpl
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
index f120ac3..0c1f85f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
@@ -21,7 +21,7 @@ package org.apache.samza.test.harness;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.InMemorySystemConfig;
 import org.apache.samza.config.JobConfig;
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index b6b8a96..b70eb2c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -1052,7 +1052,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndStreamTableTableJoinWithPrimaryKeys() throws 
Exception {
+  public void testEndToEndStreamTableNestedJoinWithPrimaryKeys() throws 
Exception {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -1086,7 +1086,42 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndStreamTableTableJoinWithCompositeKey() throws 
Exception {
+  public void testEndToEndStreamTableNestedJoinWithSubQuery() throws Exception 
{
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    String sql =
+      "Insert into testavro.enrichedPageViewTopic "
+            + "select t.pageKey as __key__, t.pageKey as pageKey, c.name as 
companyName, t.profileName as profileName,"
+            + "       address as profileAddress "
+            + "from (select p.companyId as companyId, p.name as profileName, 
p.address as address, pv.pageKey as pageKey"
+            + "      from testavro.PAGEVIEW as pv "
+            + "      join testavro.PROFILE.`$table` as p "
+            + "      on MyTest(p.__key__) = MyTest(pv.profileId)) as t "
+            + "join testavro.COMPANY.`$table` as c "
+            + "on MyTest(t.companyId) = MyTest(c.__key__)";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
","
+            + ((GenericRecord) x.getMessage()).get("profileName").toString() + 
","
+            + ((GenericRecord) x.getMessage()).get("companyName").toString())
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
+  public void testEndToEndStreamTableNestedJoinWithCompositeKey() throws 
Exception {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
index 4f18cd8..d544e12 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/ConsoleLoggingSystemFactory.java
@@ -26,7 +26,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -55,7 +55,7 @@ public class ConsoleLoggingSystemFactory implements 
SystemFactory {
 
   @Override
   public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
-    throw new NotImplementedException();
+    throw new NotImplementedException("Not Implemented");
   }
 
   @Override
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
index a052306..27ebfed 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSerDeFactory.java
@@ -31,7 +31,7 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.serializers.Serde;
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
index 4db066a..f2c3769 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -21,7 +21,7 @@ package org.apache.samza.tools.json;
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -51,7 +51,7 @@ public class JsonRelConverterFactory implements 
SamzaRelConverterFactory {
 
     @Override
     public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> kv) {
-      throw new NotImplementedException();
+      throw new NotImplementedException("Not implemented.");
     }
 
     @Override

Reply via email to