This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f5faae0  [SAMZA-2557] Adding support for nested rows access via dot 
path. (#1386)
f5faae0 is described below

commit f5faae09fa45844ba55326a7f10a475d9e1ee8e3
Author: Slim Bouguerra <[email protected]>
AuthorDate: Wed Jul 22 08:52:05 2020 -0700

    [SAMZA-2557] Adding support for nested rows access via dot path. (#1386)
    
    * Working version still need to work on extracting nested fields udf
    
    * working version end to end with Filter optimization
    
    * left outer join test with filters
    
    * adding more comments
    
    * fix the type converter used by udfs
    
    * refix the test
    
    * Added GetNestedField built in operator to allow support backward 
comaptiblity
    
    * fix java doc and minor change on the type cast
    
    Not sure what this test is testing for it is a regular join between Stream 
and local table
    
    * Adding more tests and some logging to help read the compiled code
    
    * Adding some Type sanity to the Join functions
    
    * fix minor WAR
    
    * revert unwanted changes
    
    * fix the tests
    
    * Adding more tests for map type and fix the Avro conversion for type with 
one type only
    
    * fix the style checks
---
 .../apache/samza/sql/avro/AvroRelConverter.java    |  10 +-
 .../apache/samza/sql/avro/AvroTypeFactoryImpl.java |   6 +-
 .../org/apache/samza/sql/fn/GetNestedFieldUdf.java |  42 ----
 .../java/org/apache/samza/sql/planner/Checker.java |  11 +-
 .../org/apache/samza/sql/planner/QueryPlanner.java |  92 ++++-----
 .../samza/sql/planner/RelSchemaConverter.java      |   7 +
 .../samza/sql/planner/SamzaSqlOperatorTable.java   |   2 +
 .../sql/runner/SamzaSqlApplicationRunner.java      |   1 +
 .../samza/sql/translator/FilterTranslator.java     |   4 +-
 .../apache/samza/sql/translator/JoinInputNode.java |  10 +-
 .../samza/sql/translator/JoinTranslator.java       |  30 ++-
 .../sql/translator/MessageStreamCollector.java     | 214 +++++++++++++++++++++
 .../samza/sql/translator/ProjectTranslator.java    | 139 ++++++++++++-
 .../SamzaSqlRemoteTableJoinFunction.java           |  49 ++++-
 .../sql/translator/SamzaSqlTableJoinFunction.java  |   8 +-
 .../samza/sql/translator/ScanTranslator.java       |   3 +
 .../samza/sql/translator/TranslatorContext.java    |   5 +-
 .../org/apache/samza/sql/udf/GetNestedField.java   | 171 ++++++++++++++++
 .../apache/samza/sql/planner/TestQueryPlanner.java |  12 +-
 .../samza/sql/system/TestAvroSystemFactory.java    |   3 +
 .../sql/translator/TestProjectTranslator.java      |  20 +-
 .../samza/sql/translator/TestQueryTranslator.java  |  48 ++---
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  |  82 +++++++-
 .../test/samzasql/TestSamzaSqlRemoteTable.java     |  80 +++++++-
 24 files changed, 864 insertions(+), 185 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
index b7cee00..c4138ea 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
@@ -201,10 +201,12 @@ public class AvroRelConverter implements 
SamzaRelConverter {
             .collect(Collectors.toList());
         return avroList;
       case MAP:
-        return ((Map<String, ?>) relObj).entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey,
-              e -> convertToAvroObject(e.getValue(), 
getNonNullUnionSchema(schema).getValueType())));
+        // If you ask why not using String and that is because some strings 
are Wrapped into org.apache.avro.util.Utf8
+        // TODO looking at the Utf8 code base it is not immutable, having it 
as a key is calling for trouble!
+        final Map<Object, Object> outputMap = new HashMap<>();
+        ((Map<Object, Object>) relObj).forEach((key, aValue) -> 
outputMap.put(key,
+            convertToAvroObject(aValue, 
getNonNullUnionSchema(schema).getValueType())));
+        return outputMap;
       case UNION:
         for (Schema unionSchema : schema.getTypes()) {
           if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java 
b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
index 5dddf15..d837e03 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroTypeFactoryImpl.java
@@ -148,8 +148,10 @@ public class AvroTypeFactoryImpl extends 
SqlTypeFactoryImpl {
   }
 
   private SqlFieldSchema getSqlTypeFromUnionTypes(List<Schema> types, boolean 
isNullable, boolean isOptional) {
-    // Typically a nullable field's schema is configured as an union of Null 
and a Type.
-    if (types.size() == 2) {
+    if (types.size() == 1) {
+      return convertField(types.get(0), true, true);
+    } else if (types.size() == 2) {
+      // Typically a nullable field's schema is configured as an union of Null 
and a Type.
       if (types.get(0).getType() == Schema.Type.NULL) {
         return convertField(types.get(1), true, true);
       } else if (types.get(1).getType() == Schema.Type.NULL) {
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java 
b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
deleted file mode 100644
index 4ef2a11..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetNestedFieldUdf.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.fn;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.context.Context;
-import org.apache.samza.sql.schema.SamzaSqlFieldType;
-import org.apache.samza.sql.udfs.SamzaSqlUdf;
-import org.apache.samza.sql.udfs.SamzaSqlUdfMethod;
-import org.apache.samza.sql.udfs.ScalarUdf;
-
-
-@SamzaSqlUdf(name = "GetNestedField", description = "UDF that extracts a field 
value from a nested SamzaSqlRelRecord")
-public class GetNestedFieldUdf implements ScalarUdf {
-  @Override
-  public void init(Config udfConfig, Context context) {
-  }
-
-  @SamzaSqlUdfMethod(params = {SamzaSqlFieldType.ANY, 
SamzaSqlFieldType.STRING},
-      returns = SamzaSqlFieldType.ANY)
-  public Object execute(Object currentFieldOrValue, String fieldName) {
-    GetSqlFieldUdf udf = new GetSqlFieldUdf();
-    return udf.getSqlField(currentFieldOrValue, fieldName);
-  }
-}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
index ccbee6a..60794ef 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/Checker.java
@@ -98,11 +98,11 @@ class Checker implements SqlOperandTypeChecker {
         if (parsedSqlArgType.getSqlTypeName() == SqlTypeName.CHAR && 
udfArgumentAsSqlType == SqlTypeName.VARCHAR) {
           return true;
         } else if (!Objects.equals(parsedSqlArgType.getSqlTypeName(), 
udfArgumentAsSqlType)
-                && 
!ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) && 
hasOneUdfMethod(udfMetadata)) {
+            && !ANY_SQL_TYPE_NAMES.contains(parsedSqlArgType.getSqlTypeName()) 
&& hasOneUdfMethod(udfMetadata)) {
           // 3(b). Throw up and fail on mismatch between the SamzaSqlType and 
CalciteType for any argument.
-          String msg = String.format("Type mismatch in udf class: %s at 
argument index: %d." +
-                          "Expected type: %s, actual type: %s.", 
udfMetadata.getName(),
-                  udfArgumentIndex, parsedSqlArgType.getSqlTypeName(), 
udfArgumentAsSqlType);
+          String msg = String.format(
+              "Type mismatch in udf class: %s at argument index: %d." + 
"Expected type: %s, actual type: %s.",
+              udfMetadata.getName(), udfArgumentIndex, udfArgumentAsSqlType, 
parsedSqlArgType.getSqlTypeName());
           LOG.error(msg);
           throw new SamzaSqlValidatorException(msg);
         }
@@ -159,8 +159,9 @@ class Checker implements SqlOperandTypeChecker {
   static SqlTypeName toCalciteSqlType(SamzaSqlFieldType samzaSqlFieldType) {
     switch (samzaSqlFieldType) {
       case ANY:
-      case ROW:
         return SqlTypeName.ANY;
+      case ROW:
+        return SqlTypeName.ROW;
       case MAP:
         return SqlTypeName.MAP;
       case ARRAY:
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 767df43..dc37753 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -20,15 +20,13 @@
 package org.apache.samza.sql.planner;
 
 import com.google.common.collect.ImmutableList;
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
@@ -62,10 +60,10 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.apache.samza.sql.schema.SamzaSqlFieldType;
 import org.apache.samza.sql.schema.SqlFieldSchema;
 import org.apache.samza.sql.schema.SqlSchema;
-import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,56 +120,42 @@ public class QueryPlanner {
   }
 
   private Planner getPlanner() {
-    Planner planner = null;
-    try {
-      Connection connection = DriverManager.getConnection("jdbc:calcite:");
-      CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
-      SchemaPlus rootSchema = calciteConnection.getRootSchema();
-      registerSourceSchemas(rootSchema);
-
-      List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
-          .map(SamzaSqlScalarFunctionImpl::new)
-          .collect(Collectors.toList());
-
-      final List<RelTraitDef> traitDefs = new ArrayList<>();
-
-      traitDefs.add(ConventionTraitDef.INSTANCE);
-      traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-      List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-      sqlOperatorTables.add(new SamzaSqlOperatorTable());
-      sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
-
-      // TODO: Introduce a pluggable rule factory.
-      List<RelOptRule> rules = ImmutableList.of(
-          FilterProjectTransposeRule.INSTANCE,
-          ProjectMergeRule.INSTANCE,
-          new 
SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, 
RelFactories.LOGICAL_BUILDER,
-          systemStreamConfigBySource));
-
-      // Using lenient so that !=,%,- are allowed.
-      FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          .parserConfig(SqlParser.configBuilder()
-              .setLex(Lex.JAVA)
-              .setConformance(SqlConformanceEnum.LENIENT)
-              .setCaseSensitive(false) // Make Udfs case insensitive
-              .build())
-          .defaultSchema(rootSchema)
-          .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
-          .traitDefs(traitDefs)
-          .programs(Programs.hep(rules, true, 
DefaultRelMetadataProvider.INSTANCE))
-          .build();
-      planner = Frameworks.getPlanner(frameworkConfig);
-      return planner;
-    } catch (Exception e) {
-      String errorMsg = "Failed to create planner.";
-      LOG.error(errorMsg, e);
-      if (planner != null) {
-        planner.close();
-      }
-      throw new SamzaException(errorMsg, e);
-    }
+    Planner planner;
+    SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
+    registerSourceSchemas(rootSchema);
+
+    List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions =
+        
udfMetadata.stream().map(SamzaSqlScalarFunctionImpl::new).collect(Collectors.toList());
+
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
+
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+    sqlOperatorTables.add(new SamzaSqlOperatorTable());
+    sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+
+    // TODO: Introduce a pluggable rule factory.
+    List<RelOptRule> rules = 
ImmutableList.of(FilterProjectTransposeRule.INSTANCE, ProjectMergeRule.INSTANCE,
+        new 
SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, 
RelFactories.LOGICAL_BUILDER,
+            systemStreamConfigBySource));
+
+    // Using lenient so that !=,%,- are allowed.
+    FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.configBuilder()
+            .setLex(Lex.JAVA)
+            .setConformance(SqlConformanceEnum.LENIENT)
+            .setCaseSensitive(false) // Make Udfs case insensitive
+            .build())
+        .defaultSchema(rootSchema)
+        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+        .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
+        .traitDefs(traitDefs)
+        .programs(Programs.hep(rules, true, 
DefaultRelMetadataProvider.INSTANCE))
+        .build();
+    planner = Frameworks.getPlanner(frameworkConfig);
+    return planner;
   }
 
   private RelRoot optimize(Planner planner, RelRoot relRoot) {
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index c3735a4..fcc289c 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -21,11 +21,13 @@ package org.apache.samza.sql.planner;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.type.ArraySqlType;
 import org.apache.calcite.sql.type.MapSqlType;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
@@ -94,6 +96,11 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
       case INT64:
         return createTypeWithNullability(createSqlType(SqlTypeName.BIGINT), 
true);
       case ROW:
+        final RelDataType rowType = 
convertToRelSchema(fieldSchema.getRowSchema());
+        /* Using Fully Qualified names to ensure that at the last project the 
row is fully reconstructed */
+        return 
createTypeWithNullability(createStructType(StructKind.FULLY_QUALIFIED,
+            
rowType.getFieldList().stream().map(RelDataTypeField::getType).collect(Collectors.toList()),
+            rowType.getFieldNames()), true);
       case ANY:
         // TODO Calcite execution engine doesn't support record type yet.
         return createTypeWithNullability(createSqlType(SqlTypeName.ANY), true);
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index 3098af7..6b8e8ba 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.fun.SqlMultisetValueConstructor;
 import org.apache.calcite.sql.fun.SqlRowOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.samza.sql.udf.GetNestedField;
 
 
 /**
@@ -143,6 +144,7 @@ public class SamzaSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
   public static final SqlFunction TUMBLE = SqlStdOperatorTable.TUMBLE;
   public static final SqlFunction TUMBLE_END = SqlStdOperatorTable.TUMBLE_END;
   public static final SqlFunction TUMBLE_START = 
SqlStdOperatorTable.TUMBLE_START;
+  public static final SqlFunction GET_NESTED_FIELD_OP = 
GetNestedField.INSTANCE;
 
   public SamzaSqlOperatorTable() {
     init();
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index ce42bee..23d7be5 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -61,6 +61,7 @@ public class SamzaSqlApplicationRunner implements 
ApplicationRunner {
    * NOTE: This constructor is called from {@link ApplicationRunners} through 
reflection.
    * Please refrain from updating the signature or removing this constructor 
unless the caller has changed the interface.
    */
+  @SuppressWarnings("unused") /* used via reflection */
   public SamzaSqlApplicationRunner(SamzaApplication app, Config config) {
     this(app, false, config);
   }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index 604f061..b42569c 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -80,6 +80,7 @@ class FilterTranslator {
       this.context = context;
       this.translatorContext = ((SamzaSqlApplicationContext) 
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.filter = (LogicalFilter) 
this.translatorContext.getRelNode(filterId);
+      LOG.info("Compiling Operator {}", filter.getDigest());
       this.expr = 
this.translatorContext.getExpressionCompiler().compile(filter.getInputs(), 
Collections.singletonList(filter.getCondition()));
       ContainerContext containerContext = context.getContainerContext();
       metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -96,9 +97,10 @@ class FilterTranslator {
     public boolean apply(SamzaSqlRelMessage message) {
       long startProcessing = System.nanoTime();
       Object[] result = new Object[1];
+      Object[] inputRow = 
ProjectTranslator.convertToJavaRow(message.getSamzaSqlRelRecord());
       try {
         expr.execute(translatorContext.getExecutionContext(), context, 
translatorContext.getDataContext(),
-            message.getSamzaSqlRelRecord().getFieldValues().toArray(), result);
+            inputRow, result);
       } catch (Exception e) {
         String errMsg = String.format("Handling the following rel message ran 
into an error. %s", message);
         LOG.error(errMsg, e);
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
index 1a0a7ec..c51ff25 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -74,7 +74,7 @@ public class JoinInputNode {
   }
 
   String getSourceName() {
-    return 
SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
+    return 
SqlIOConfig.getSourceFromSourceParts(getTableScan(relNode).getTable().getQualifiedName());
   }
 
   boolean isPosOnRight() {
@@ -109,4 +109,12 @@ public class JoinInputNode {
     }
   }
 
+  private static TableScan getTableScan(RelNode relNode) {
+    if (relNode instanceof TableScan) {
+      return (TableScan) relNode;
+    }
+    // we deal with Single inputs filter/project
+    assert relNode.getInputs().size() == 1;
+    return getTableScan(relNode.getInput(0));
+  }
 }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index 02c434d..26d9fa0 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -40,6 +40,7 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSlot;
 import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlKind;
@@ -79,7 +80,7 @@ import static 
org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeK
 class JoinTranslator {
 
   private static final Logger log = 
LoggerFactory.getLogger(JoinTranslator.class);
-  private String logicalOpId;
+  private final String logicalOpId;
   private final String intermediateStreamPrefix;
   private final int queryId;
   private final TranslatorInputMetricsMapFunction inputMetricsMF;
@@ -165,8 +166,16 @@ class JoinTranslator {
 
     if (tableNode.isRemoteTable()) {
       String remoteTableName = tableNode.getSourceName();
-      StreamTableJoinFunction joinFn = new 
SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
-          context.getTableKeyConverter(remoteTableName), streamNode, 
tableNode, join.getJoinType(), queryId);
+      MessageStream<SamzaSqlRelMessage> operatorStack = 
context.getMessageStream(tableNode.getRelNode().getId());
+      final  StreamTableJoinFunction<Object, SamzaSqlRelMessage, KV, 
SamzaSqlRelMessage> joinFn;
+      if (operatorStack != null && operatorStack instanceof 
MessageStreamCollector) {
+        joinFn = new 
SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+            context.getTableKeyConverter(remoteTableName), streamNode, 
tableNode, join.getJoinType(), queryId,
+            (MessageStreamCollector) operatorStack);
+      } else {
+        joinFn = new 
SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
+            context.getTableKeyConverter(remoteTableName), streamNode, 
tableNode, join.getJoinType(), queryId);
+      }
 
       return inputStream
           .map(inputMetricsMF)
@@ -175,7 +184,11 @@ class JoinTranslator {
 
     // Join with the local table
 
-    StreamTableJoinFunction joinFn = new 
SamzaSqlLocalTableJoinFunction(streamNode, tableNode, join.getJoinType());
+    StreamTableJoinFunction<SamzaSqlRelRecord,
+        SamzaSqlRelMessage,
+        KV<SamzaSqlRelRecord, SamzaSqlRelMessage>,
+        SamzaSqlRelMessage>
+        joinFn = new SamzaSqlLocalTableJoinFunction(streamNode, tableNode, 
join.getJoinType());
 
     SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde =
         (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new 
SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
@@ -270,8 +283,8 @@ class JoinTranslator {
         isTablePosOnRight ? join.getRowType().getFieldCount() : 
join.getLeft().getRowType().getFieldCount();
 
     List<Integer> tableRefsIdx = refCollector.stream()
-        .map(x -> x.getIndex())
-        .filter(x -> tableStartIndex <= x && x < tableEndIndex) // collect all 
the refs form table side
+        .map(RexSlot::getIndex)
+        .filter(x -> (tableStartIndex <= x) && (x < tableEndIndex)) // collect 
all the refs form table side
         .map(x -> x - tableStartIndex) // re-adjust the offset
         .sorted()
         .collect(Collectors.toList()); // we have a list with all the input 
from table side with 0 based index.
@@ -344,11 +357,12 @@ class JoinTranslator {
   private void validateJoinKeyType(RexInputRef ref) {
     SqlTypeName sqlTypeName = ref.getType().getSqlTypeName();
 
-    // Primitive types and ANY (for the record key) are supported in the key
+    // Primitive types and Row (for the record key) and ANY for other fields 
like __key__
+    // TODO this need to be pulled to a common class/place that have all the 
supported types
     if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != 
SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
         && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != 
SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
         && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != 
SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
-        && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER) 
{
+        && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER 
&& sqlTypeName != SqlTypeName.ROW) {
       log.error("Unsupported key type " + sqlTypeName + " used in join 
condition.");
       throw new SamzaException("Unsupported key type used in join condition.");
     }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
new file mode 100644
index 0000000..0ddc136
--- /dev/null
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/MessageStreamCollector.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.translator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.function.Function;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Collector of Map and Filter Samza Functions to collect call stack on the 
top of Remote table.
+ * This Collector will be used by Join operator and trigger it when applying 
the join function post lookup.
+ *
+ * Note that this is needed because the Remote Table can not expose a proper 
{@code MessageStream}.
+ * It is a work around to minimize the amount of code changes of the current 
Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
+ * But in an ideal world, we should use Calcite planner in conventional way we 
can combine function when via translation of RelNodes.
+ */
+class MessageStreamCollector implements MessageStream<SamzaSqlRelMessage>, 
Serializable, Closeable {
+
+  /**
+   * Queue First in First to be Fired order of the operators on the top of 
Remote Table Scan.
+   */
+  private final Deque<MapFunction<? super SamzaSqlRelMessage, ? extends 
SamzaSqlRelMessage>> mapFnCallQueue =
+      new ArrayDeque<>();
+
+  /**
+   * Function to chain the call to close from each operator.
+   */
+  private transient Function<Void, Void> closeFn = aVoid -> null;
+
+  @Override
+  public <OM> MessageStream<OM> map(MapFunction<? super SamzaSqlRelMessage, ? 
extends OM> mapFn) {
+    mapFnCallQueue.offer((MapFunction<? super SamzaSqlRelMessage, ? extends 
SamzaSqlRelMessage>) mapFn);
+    return (MessageStream<OM>) this;
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> filter(FilterFunction<? super 
SamzaSqlRelMessage> filterFn) {
+    mapFnCallQueue.offer(new FilterMapAdapter(filterFn));
+    return this;
+  }
+
+  /**
+   * This function is called by the join operator on run time to apply filter 
and projects post join lookup.
+   *
+   * @param context Samza Execution Context
+   * @return {code null} case filter reject the row, Samza Relational Record 
as it goes via Projects.
+   */
+  Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context 
context) {
+    Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
+    Function<Void, Void> intFn = aVoid -> null; // Projects and Filters both 
need to be initialized.
+    closeFn = aVoid -> null;
+    // At this point we have a the queue of operator, where first in is the 
first operator on top of TableScan.
+    while (!mapFnCallQueue.isEmpty()) {
+      MapFunction<? super SamzaSqlRelMessage, ? extends SamzaSqlRelMessage> f 
= mapFnCallQueue.poll();
+      intFn = intFn.andThen((aVoid) -> {
+        f.init(context);
+        return null;
+      });
+      closeFn.andThen((aVoid) -> {
+        f.close();
+        return null;
+      });
+
+      Function<SamzaSqlRelMessage, SamzaSqlRelMessage> current = x -> x == 
null ? null : f.apply(x);
+      if (tailFn == null) {
+        tailFn = current;
+      } else {
+        tailFn = current.compose(tailFn);
+      }
+    }
+    // TODO TBH not sure about this need to check if Samza Framework will be 
okay with late init call.
+    intFn.apply(null); // Init call has to happen here.
+    return tailFn == null ? Function.identity() : tailFn;
+  }
+
+  /**
+   * Filter adapter is used to compose filters with {@code 
MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage>}
+   * Filter function will return {@code null} when input is {@code null} or 
filter condition reject current row.
+   */
+  private static class FilterMapAdapter implements 
MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
+    private final FilterFunction<? super SamzaSqlRelMessage> filterFn;
+
+    private FilterMapAdapter(FilterFunction<? super SamzaSqlRelMessage> 
filterFn) {
+      this.filterFn = filterFn;
+    }
+
+    @Override
+    public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
+      if (message != null && filterFn.apply(message)) {
+        return message;
+      }
+      // null on case no match
+      return null;
+    }
+
+    @Override
+    public void close() {
+      filterFn.close();
+    }
+
+    @Override
+    public void init(Context context) {
+      filterFn.init(context);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (closeFn != null) {
+      closeFn.apply(null);
+    }
+  }
+
+  @Override
+  public <OM> MessageStream<OM> flatMap(FlatMapFunction<? super 
SamzaSqlRelMessage, ? extends OM> flatMapFn) {
+    return null;
+  }
+
+  @Override
+  public <OM> MessageStream<OM> flatMapAsync(
+      AsyncFlatMapFunction<? super SamzaSqlRelMessage, ? extends OM> 
asyncFlatMapFn) {
+    return null;
+  }
+
+  @Override
+  public void sink(SinkFunction<? super SamzaSqlRelMessage> sinkFn) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> 
sendTo(OutputStream<SamzaSqlRelMessage> outputStream) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public <K, WV> MessageStream<WindowPane<K, WV>> 
window(Window<SamzaSqlRelMessage, K, WV> window, String id) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+      JoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super OM, ? 
extends JM> joinFn, Serde<K> keySerde,
+      Serde<SamzaSqlRelMessage> messageSerde, Serde<OM> otherMessageSerde, 
Duration ttl, String id) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
+      StreamTableJoinFunction<? extends K, ? super SamzaSqlRelMessage, ? super 
R, ? extends JM> joinFn,
+      Object... args) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> merge(
+      Collection<? extends MessageStream<? extends SamzaSqlRelMessage>> 
otherStreams) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super 
SamzaSqlRelMessage, ? extends K> keyExtractor,
+      MapFunction<? super SamzaSqlRelMessage, ? extends V> valueExtractor, 
KVSerde<K, V> serde, String id) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, 
Object... args) {
+    throw new IllegalStateException("Not valid state");
+  }
+
+  @Override
+  public MessageStream<SamzaSqlRelMessage> broadcast(Serde<SamzaSqlRelMessage> 
serde, String id) {
+    throw new IllegalStateException("Not valid state");
+  }
+}
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index bf44815..16a320e 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -19,12 +19,15 @@
 
 package org.apache.samza.sql.translator;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -38,7 +41,9 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.Expression;
+import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
@@ -50,7 +55,7 @@ import org.slf4j.LoggerFactory;
  * Translator to translate the Project node in the relational graph to the 
corresponding StreamGraph
  * implementation.
  */
-class ProjectTranslator {
+public class ProjectTranslator {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ProjectTranslator.class);
   //private transient int messageIndex = 0;
@@ -61,6 +66,122 @@ class ProjectTranslator {
   }
 
   /**
+   * Converts the resulting row from Calcite Expression Evaluator to 
SamzaRelRecord to be sent downstream.
+   *
+   * @param objects input objects to be converted
+   * @param rowType Calcite row type of the resulting row
+   * @return return a valid message Stream of type SamzaSqlRelRecord
+   */
+  public static SamzaSqlRelRecord buildSamzaRelRecord(Object[] objects, 
RelDataType rowType) {
+    Preconditions.checkNotNull(objects, "Input objects can not be null");
+    Preconditions.checkState(rowType.isStruct(), "Row Type has to be a Struct 
and got " + rowType.getSqlTypeName());
+    Preconditions.checkState(objects.length == rowType.getFieldCount(),
+        "Objects counts and type counts must match " + objects.length + " vs " 
+ rowType.getFieldCount());
+    List<String> names = new ArrayList<>(rowType.getFieldNames());
+    List<Object> values = new ArrayList<>(rowType.getFieldCount());
+    for (int i = 0; i < objects.length; i++) {
+      Object val = objects[i];
+      if (val == null) {
+        values.add(null);
+        continue;
+      }
+      final RelDataType valueType = rowType.getFieldList().get(i).getType();
+      values.add(convertToSamzaSqlType(val, valueType));
+    }
+    return new SamzaSqlRelRecord(names, values);
+  }
+
+  /**
+   * Recursively converts a Primitive Java Object to valid Samza Rel Record 
field type.
+   *
+   * @param value input value to be converted
+   * @param dataType value type as derived by Calcite
+   * @return SamzaRelRecord or primitive SamzaRelRecord field.
+   *
+   */
+  private static Object convertToSamzaSqlType(Object value, RelDataType 
dataType) {
+    if (value == null) {
+      return null;
+    }
+    switch (dataType.getSqlTypeName()) {
+      case ROW:
+        List<String> names = new ArrayList<>(dataType.getFieldNames());
+        // Row Struct is represent as Object array in Calcite.
+        Object[] row = (Object[]) value;
+        List<Object> values = new ArrayList<>(row.length);
+        for (int i = 0; i < row.length; i++) {
+          values.add(convertToSamzaSqlType(row[i], 
dataType.getFieldList().get(i).getType()));
+        }
+        return new SamzaSqlRelRecord(names, values);
+      case MAP:
+        Map<Object, Object> objectMap = (Map<Object, Object>) value;
+        Map<Object, Object> resultMap = new HashMap<>();
+        final RelDataType valuesType = dataType.getValueType();
+        objectMap.forEach((key, v) -> resultMap.put(key, 
convertToSamzaSqlType(v, valuesType)));
+        return resultMap;
+      case ARRAY:
+        List<Object> objectList = (List<Object>) value;
+        final RelDataType elementsType = dataType.getComponentType();
+        return objectList.stream().map(e -> convertToSamzaSqlType(e, 
elementsType)).collect(Collectors.toList());
+      case BOOLEAN:
+      case BIGINT:
+      case BINARY:
+      case INTEGER:
+      case TINYINT:
+      case DOUBLE:
+      case FLOAT:
+      case REAL:
+      case VARCHAR:
+      case CHAR:
+      case VARBINARY:
+      case ANY:
+      case OTHER:
+        // today we treat everything else as Type Any or Other, this is not 
ideal.
+        // this will change when adding timestamps support or more complex non 
java primitive types.
+        // TODO in a better world we need to add type factory that can do the 
conversion between calcite and samza.
+        return value;
+      default:
+        // As of today we treat everything else as type ANY
+        throw new IllegalStateException("Unknown SQL type " + 
dataType.getSqlTypeName());
+    }
+  }
+
+  /**
+   * Converts the Samza Record to a Java Primitive Row format that's in 
convention with Calcite Enum operators.
+   *
+   * @param samzaSqlRelRecord input record.
+   * @return row of Java Primitive conform to 
org.apache.calcite.adapter.enumerable.JavaRowFormat#ARRAY
+   */
+  public static Object[] convertToJavaRow(SamzaSqlRelRecord samzaSqlRelRecord) 
{
+    if (samzaSqlRelRecord == null) {
+      return null;
+    }
+    Object[] inputRow = new Object[samzaSqlRelRecord.getFieldValues().size()];
+    for (int i = 0; i < inputRow.length; i++) {
+      inputRow[i] = 
asPrimitiveJavaRow(samzaSqlRelRecord.getFieldValues().get(i));
+    }
+    return inputRow;
+  }
+
+  private static Object asPrimitiveJavaRow(Object inputObject) {
+    if (inputObject == null) {
+      return null;
+    }
+    if (inputObject instanceof SamzaSqlRelRecord) {
+      return convertToJavaRow((SamzaSqlRelRecord) inputObject);
+    }
+    if (inputObject instanceof List) {
+      return ((List) inputObject).stream().map(e -> 
asPrimitiveJavaRow(e)).collect(Collectors.toList());
+    }
+    if (inputObject instanceof Map) {
+      Map<Object, Object> objectMap = new HashMap<>();
+      ((Map<Object, Object>) inputObject).forEach((k, v) -> objectMap.put(k, 
asPrimitiveJavaRow(v)));
+      return objectMap;
+    }
+    return inputObject;
+  }
+
+  /**
    * ProjectMapFunction implements MapFunction to map input 
SamzaSqlRelMessages, one at a time, to a new
    * SamzaSqlRelMessage which consists of the projected fields
    */
@@ -94,6 +215,7 @@ class ProjectTranslator {
       this.translatorContext =
           ((SamzaSqlApplicationContext) 
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.project = (Project) this.translatorContext.getRelNode(projectId);
+      LOG.info("Compiling operator {} ", project.getDigest());
       this.expr = 
this.translatorContext.getExpressionCompiler().compile(project.getInputs(), 
project.getProjects());
       ContainerContext containerContext = context.getContainerContext();
       metricsRegistry = containerContext.getContainerMetricsRegistry();
@@ -114,20 +236,19 @@ class ProjectTranslator {
       long arrivalTime = System.nanoTime();
       RelDataType type = project.getRowType();
       Object[] output = new Object[type.getFieldCount()];
+      Object[] inputRow = convertToJavaRow(message.getSamzaSqlRelRecord());
+      SamzaSqlExecutionContext execContext = 
translatorContext.getExecutionContext();
+      DataContext dataRootContext = translatorContext.getDataContext();
       try {
-        expr.execute(translatorContext.getExecutionContext(), context, 
translatorContext.getDataContext(),
-            message.getSamzaSqlRelRecord().getFieldValues().toArray(), output);
+        expr.execute(execContext, context, dataRootContext, inputRow, output);
       } catch (Exception e) {
         String errMsg = String.format("Handling the following rel message ran 
into an error. %s", message);
         LOG.error(errMsg, e);
         throw new SamzaException(errMsg, e);
       }
-      List<String> names = new ArrayList<>();
-      for (int index = 0; index < output.length; index++) {
-        names.add(index, project.getNamedProjects().get(index).getValue());
-      }
+      SamzaSqlRelRecord record = buildSamzaRelRecord(output, 
project.getRowType());
       updateMetrics(arrivalTime, System.nanoTime(), 
message.getSamzaSqlRelMsgMetadata().isNewInputMessage);
-      return new SamzaSqlRelMessage(names, Arrays.asList(output), 
message.getSamzaSqlRelMsgMetadata());
+      return new SamzaSqlRelMessage(record, 
message.getSamzaSqlRelMsgMetadata());
     }
 
     /**
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
index 6a93b2d..8a60502 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRemoteTableJoinFunction.java
@@ -21,6 +21,8 @@ package org.apache.samza.sql.translator;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Function;
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
@@ -42,15 +44,35 @@ public class SamzaSqlRemoteTableJoinFunction
   private transient SamzaRelTableKeyConverter relTableKeyConverter;
   private final String tableName;
   private final int queryId;
+  /**
+   * Projection and Filter Function to apply post the join lookup. Function 
will return null in case filter rejects row.
+   */
+  private Function<SamzaSqlRelMessage, SamzaSqlRelMessage> projectFunction;
+  /**
+   * Projects and Filters operator queue.
+   */
+  private final MessageStreamCollector messageStreamCollector;
+
+  public SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, 
SamzaRelTableKeyConverter tableKeyConverter,
+      JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType 
joinRelType, int queryId,
+      MessageStreamCollector messageStreamCollector) {
+    super(streamNode, tableNode, joinRelType);
+    this.msgConverter = msgConverter;
+    this.relTableKeyConverter = tableKeyConverter;
+    this.tableName = tableNode.getSourceName();
+    this.queryId = queryId;
+    this.messageStreamCollector = messageStreamCollector;
+  }
 
   SamzaSqlRemoteTableJoinFunction(SamzaRelConverter msgConverter, 
SamzaRelTableKeyConverter tableKeyConverter,
       JoinInputNode streamNode, JoinInputNode tableNode, JoinRelType 
joinRelType, int queryId) {
     super(streamNode, tableNode, joinRelType);
-
     this.msgConverter = msgConverter;
     this.relTableKeyConverter = tableKeyConverter;
     this.tableName = tableNode.getSourceName();
     this.queryId = queryId;
+    this.projectFunction = Function.identity();
+    this.messageStreamCollector = null;
   }
 
   @Override
@@ -59,13 +81,24 @@ public class SamzaSqlRemoteTableJoinFunction
         ((SamzaSqlApplicationContext) 
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
     this.msgConverter = translatorContext.getMsgConverter(tableName);
     this.relTableKeyConverter = 
translatorContext.getTableKeyConverter(tableName);
+    if (messageStreamCollector != null) {
+      projectFunction = messageStreamCollector.getFunction(context);
+    }
   }
 
+  /**
+   * Compute the projection and filter post join lookup.
+   *
+   * @param record input record as result of lookup
+   * @return the projected row or {@code null} if Row doesn't pass the filter 
condition.
+   */
   @Override
+  @Nullable
   protected List<Object> getTableRelRecordFieldValues(KV record) {
     // Using the message rel converter, convert message to sql rel message and 
add to output values.
-    SamzaSqlRelMessage relMessage = msgConverter.convertToRelMessage(record);
-    return relMessage.getSamzaSqlRelRecord().getFieldValues();
+    final SamzaSqlRelMessage relMessage = 
msgConverter.convertToRelMessage(record);
+    final SamzaSqlRelMessage result = projectFunction.apply(relMessage);
+    return result == null ? null : 
result.getSamzaSqlRelRecord().getFieldValues();
   }
 
   @Override
@@ -76,6 +109,8 @@ public class SamzaSqlRemoteTableJoinFunction
       return null;
     }
     // Using the table key converter, convert message key from rel format to 
the record key format.
+    // TODO: On way to avoid the object type here is to ensure that:
+    // table's key is a SamzaRelRecord or any well defined type when defining 
the table descriptor
     return relTableKeyConverter.convertToTableKeyFormat(keyRecord);
   }
 
@@ -83,4 +118,12 @@ public class SamzaSqlRemoteTableJoinFunction
   public Object getRecordKey(KV record) {
     return record.getKey();
   }
+
+  @Override
+  public void close() {
+    super.close();
+    if (messageStreamCollector != null) {
+      messageStreamCollector.close();
+    }
+  }
 }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
index cd7ecdc..e8fa451 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlTableJoinFunction.java
@@ -21,6 +21,7 @@ package org.apache.samza.sql.translator;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.commons.lang3.Validate;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -48,6 +49,7 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
   // Table field names are used in the outer join when the table record is not 
found.
   private final ArrayList<String> tableFieldNames;
   private final ArrayList<String> outFieldNames;
+  final private List<Object> nullRow;
 
   SamzaSqlTableJoinFunction(JoinInputNode streamNode, JoinInputNode tableNode, 
JoinRelType joinRelType) {
     this.joinRelType = joinRelType;
@@ -69,6 +71,7 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
       outFieldNames.addAll(tableFieldNames);
       outFieldNames.addAll(streamNode.getFieldNames());
     }
+    nullRow = tableFieldNames.stream().map(x -> 
null).collect(Collectors.toList());
   }
 
   @Override
@@ -93,7 +96,10 @@ public abstract class SamzaSqlTableJoinFunction<K, R>
 
     // Add the table record fields.
     if (record != null) {
-      outFieldValues.addAll(getTableRelRecordFieldValues(record));
+      List<Object> row = getTableRelRecordFieldValues(record);
+      // null in case the filter did not match thus row has to be removed if 
inner join or padded null case outer join
+      if (row == null && joinRelType.compareTo(JoinRelType.INNER) == 0) return 
null;
+      outFieldValues.addAll(row == null ? nullRow : row);
     } else {
       // Table record could be null as the record could not be found in the 
store. This can
       // happen for outer joins. Add nulls to all the field values in the 
output message.
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index f58140d..87c4e00 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -170,7 +170,10 @@ class ScanTranslator {
     // SqlIOResolverFactory.
     // For local table, even though table descriptor is already defined, we 
still need to create the input stream
     // descriptor to load the local table.
+    // To handle case where a project or filter is pushed to Remote table Scan 
will collect the operators and feed it to the join operator.
+    // TODO In an ideal world this has to change and use Calcite Pattern 
matching to translate the plan.
     if (isRemoteTable) {
+      context.registerMessageStream(tableScan.getId(), new 
MessageStreamCollector());
       return;
     }
 
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 5990897..7d85652 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelTableKeyConverter;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.sql.data.RexToJavaCompiler;
@@ -52,7 +53,7 @@ public class TranslatorContext implements Cloneable {
   private final RexToJavaCompiler compiler;
   private final Map<String, SamzaRelConverter> relSamzaConverters;
   private final Map<String, SamzaRelTableKeyConverter> relTableKeyConverters;
-  private final Map<Integer, MessageStream> messageStreams;
+  private final Map<Integer, MessageStream<SamzaSqlRelMessage>> messageStreams;
   private final Map<Integer, RelNode> relNodes;
   private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
 
@@ -199,7 +200,7 @@ public class TranslatorContext implements Cloneable {
    * @param id the id
    * @return the message stream
    */
-  MessageStream getMessageStream(int id) {
+  MessageStream<SamzaSqlRelMessage> getMessageStream(int id) {
     return messageStreams.get(id);
   }
 
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java 
b/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
new file mode 100644
index 0000000..87cf486
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/udf/GetNestedField.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.udf;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.calcite.adapter.enumerable.CallImplementor;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.enumerable.RexImpTable;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.tree.ConstantExpression;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.ExpressionType;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.ImplementableFunction;
+import org.apache.calcite.schema.ScalarFunction;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+import static org.apache.calcite.schema.impl.ReflectiveFunctionBase.builder;
+
+
+/**
+ * Operator to extract nested Rows or Fields form a struct row type using a 
dotted path.
+ * The goal of this operator is two-fold.
+ * First it is a temporary fix for 
https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
+ * Second it will enable smooth backward compatible migration from existing 
udf that relies on legacy row format.
+ */
+public class GetNestedField extends SqlUserDefinedFunction {
+
+  public static final SqlFunction INSTANCE = new GetNestedField(new 
ExtractFunction());
+
+  public GetNestedField(Function function) {
+    super(new SqlIdentifier("GetNestedField", SqlParserPos.ZERO), null, null, 
null, ImmutableList.of(), function);
+  }
+
+  @Override
+  public SqlOperandCountRange getOperandCountRange() {
+    return SqlOperandCountRanges.of(2);
+  }
+
+  @Override
+  public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+    final SqlNode left = callBinding.operand(0);
+    final SqlNode right = callBinding.operand(1);
+    final RelDataType type = 
callBinding.getValidator().deriveType(callBinding.getScope(), left);
+    boolean isRow = true;
+    if (type.getSqlTypeName() != SqlTypeName.ROW) {
+      isRow = false;
+    } else if (type.getSqlIdentifier().isStar()) {
+      isRow = false;
+    }
+    if (!isRow && throwOnFailure) {
+      throw callBinding.newValidationSignatureError();
+    }
+    return isRow && OperandTypes.STRING.checkSingleOperandType(callBinding, 
right, 0, throwOnFailure);
+  }
+
+  @Override
+  public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+    final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+    final RelDataType recordType = opBinding.getOperandType(0);
+    switch (recordType.getSqlTypeName()) {
+      case ROW:
+        final String fieldName = opBinding.getOperandLiteralValue(1, 
String.class);
+        String[] fieldNameChain = fieldName.split("\\.");
+        RelDataType relDataType = opBinding.getOperandType(0);
+        for (int i = 0; i < fieldNameChain.length; i++) {
+          RelDataTypeField t = relDataType.getField(fieldNameChain[i], true, 
true);
+          Preconditions.checkNotNull(t,
+              "Can not find " + fieldNameChain[i] + " within record " + 
recordType.toString() + " Original String "
+                  + Arrays.toString(fieldNameChain) + " Original row " + 
recordType.toString());
+          relDataType = t.getType();
+        }
+        if (recordType.isNullable()) {
+          return typeFactory.createTypeWithNullability(relDataType, true);
+        } else {
+          return relDataType;
+        }
+      default:
+        throw new AssertionError("First Operand is suppose to be a Row 
Struct");
+    }
+  }
+
+  private static class ExtractFunction implements ScalarFunction, 
ImplementableFunction {
+    private final JavaTypeFactoryImpl javaTypeFactory = new 
JavaTypeFactoryImpl();
+
+    @Override
+    public CallImplementor getImplementor() {
+      return RexImpTable.createImplementor((translator, call, 
translatedOperands) -> {
+        Preconditions.checkState(translatedOperands.size() == 2 && 
call.operands.size() == 2,
+            "Expected 2 operands found " + Math.min(translatedOperands.size(), 
call.getOperands().size()));
+        Expression op0 = translatedOperands.get(0);
+        Expression op1 = translatedOperands.get(1);
+        
Preconditions.checkState(op1.getNodeType().equals(ExpressionType.Constant),
+            "Operand 2 has to be constant and got " + op1.getNodeType());
+        Preconditions.checkState(op1.type.equals(String.class), "Operand 2 has 
to be String and got " + op1.type);
+        final String fieldName = (String) ((ConstantExpression) op1).value;
+        String[] fieldNameChain = fieldName.split("\\.");
+        RelDataType relDataType = call.operands.get(0).getType();
+        
Preconditions.checkState(relDataType.getSqlTypeName().equals(SqlTypeName.ROW),
+            "Expected first operand to be ROW found " + 
relDataType.toString());
+        Expression currentExpression = op0;
+        for (int i = 0; i < fieldNameChain.length; i++) {
+          Preconditions.checkState(relDataType.getSqlTypeName() == 
SqlTypeName.ROW,
+              "Must be ROW found " + relDataType.toString());
+          RelDataTypeField t = relDataType.getField(fieldNameChain[i], true, 
true);
+          Preconditions.checkNotNull(t,
+              "Notfound " + fieldNameChain[i] + " in the following struct " + 
relDataType.toString()
+                  + " Original String " + Arrays.toString(fieldNameChain) + " 
Original row " + call.operands.get(0)
+                  .getType());
+          currentExpression = 
Expressions.arrayIndex(Expressions.convert_(currentExpression, Object[].class),
+              Expressions.constant(t.getIndex()));
+          relDataType = t.getType();
+        }
+        Type fieldType = javaTypeFactory.getJavaClass(relDataType);
+        return EnumUtils.convert(currentExpression, fieldType);
+      }, NullPolicy.ARG0, false);
+    }
+
+    @Override
+    public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+      throw new IllegalStateException("should not be called");
+    }
+
+    @Override
+    public List<FunctionParameter> getParameters() {
+      return builder().add(Object[].class, "row").add(String.class, 
"path").build();
+    }
+  }
+
+  @Override
+  public String getAllowedSignatures(String opNameToUse) {
+    return opNameToUse + "(<ROW>, <VARCHAR>)";
+  }
+}
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java 
b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
index 2a9dc13..d234067 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
@@ -129,7 +129,7 @@ public class TestQueryPlanner {
     LogicalJoin join = (LogicalJoin) relNode;
     RelNode left = join.getLeft();
     RelNode right = join.getRight();
-    assertTrue(right instanceof LogicalTableScan);
+    assertTrue("Was instance of "  + right.getClass(), right instanceof 
LogicalProject);
     if (enableOptimizer) {
       assertTrue(left instanceof LogicalFilter);
       assertEquals("=(1, $2)", ((LogicalFilter) 
left).getCondition().toString());
@@ -187,9 +187,9 @@ public class TestQueryPlanner {
     relNode = relNode.getInput(0);
     assertTrue(relNode instanceof LogicalFilter);
     if (enableOptimizer) {
-      assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) 
relNode).getCondition().toString());
+      assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter) 
relNode).getCondition().toString());
     } else {
-      assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))", 
((LogicalFilter) relNode).getCondition().toString());
+      assertEquals("AND(=(1, $11), =($2, $10), =($2, 'Mike'))", 
((LogicalFilter) relNode).getCondition().toString());
     }
     relNode = relNode.getInput(0);
     if (enableOptimizer) {
@@ -202,7 +202,7 @@ public class TestQueryPlanner {
     LogicalJoin join = (LogicalJoin) relNode;
     RelNode left = join.getLeft();
     RelNode right = join.getRight();
-    assertTrue(left instanceof LogicalTableScan);
+    assertTrue("was instance of " + left.getClass(), left instanceof 
LogicalProject);
     if (enableOptimizer) {
       assertTrue(right instanceof LogicalFilter);
       assertEquals("=(1, $2)", ((LogicalFilter) 
right).getCondition().toString());
@@ -289,14 +289,14 @@ public class TestQueryPlanner {
     assertTrue(relNode instanceof LogicalProject);
     relNode = relNode.getInput(0);
     assertTrue(relNode instanceof LogicalFilter);
-    assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) 
relNode).getCondition().toString());
+    assertEquals("AND(=($2, $10), =($2, 'Mike'))", ((LogicalFilter) 
relNode).getCondition().toString());
     relNode = relNode.getInput(0);
     assertTrue(relNode instanceof LogicalJoin);
     assertEquals(2, relNode.getInputs().size());
     LogicalJoin join = (LogicalJoin) relNode;
     RelNode left = join.getLeft();
     RelNode right = join.getRight();
-    assertTrue(left instanceof LogicalTableScan);
+    assertTrue(left instanceof LogicalProject);
     assertTrue(right instanceof LogicalFilter);
     assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter) 
right).getCondition().toString());
     assertTrue(right.getInput(0) instanceof LogicalTableScan);
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 47355be..cc33e5f 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -258,6 +258,9 @@ public class TestAvroSystemFactory implements SystemFactory 
{
       record.put("address", createProfileAddressRecord(index));
       record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? 
null : index % COMPANIES.length);
       record.put("phoneNumbers", createProfilePhoneNumbers(index % 
PHONE_NUMBERS.length));
+      Map<String, Object> mapValues = new HashMap<>();
+      mapValues.put("key", createSimpleRecord(index, false));
+      record.put("mapValues", mapValues);
       return record;
     }
 
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index dc193a2..33e775b 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -18,6 +18,7 @@
 */
 package org.apache.samza.sql.translator;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,7 +29,10 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
 import 
org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
 import org.apache.samza.context.ContainerContext;
@@ -68,6 +72,7 @@ import static org.mockito.Mockito.when;
 @PrepareForTest(LogicalProject.class)
 public class TestProjectTranslator extends TranslatorTestBase {
   private static final String LOGICAL_OP_ID = "sql0_project_0";
+  private static final String TEST_FIELD = "test_field";
 
   @Test
   public void testTranslate() throws IOException, ClassNotFoundException {
@@ -86,12 +91,23 @@ public class TestProjectTranslator extends 
TranslatorTestBase {
     when(mockProject.getInputs()).thenReturn(inputs);
     when(mockProject.getInput()).thenReturn(mockInput);
     RelDataType mockRowType = mock(RelDataType.class);
+    List<RelDataTypeField> relFields = new ArrayList<>();
+    String fieldName = TEST_FIELD;
+    int fieldPos = 0;
+    RelDataType dataType = mock(RelDataType.class);
+    when(dataType.getSqlTypeName()).thenReturn(SqlTypeName.ANY);
+    relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
     when(mockRowType.getFieldCount()).thenReturn(1);
     when(mockProject.getRowType()).thenReturn(mockRowType);
+    
when(mockProject.getRowType().getSqlTypeName()).thenReturn(SqlTypeName.ROW);
+    when(mockProject.getRowType().getFieldList()).thenReturn(relFields);
+    when(mockProject.getRowType().isStruct()).thenReturn(true);
     RexNode mockRexField = mock(RexNode.class);
     List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
-    namedProjects.add(Pair.of(mockRexField, "test_field"));
+    namedProjects.add(Pair.of(mockRexField, TEST_FIELD));
     when(mockProject.getNamedProjects()).thenReturn(namedProjects);
+    when(mockProject.getRowType()).thenReturn(mockRowType);
+    
when(mockProject.getRowType().getFieldNames()).thenReturn(ImmutableList.of(TEST_FIELD));
     StreamApplicationDescriptorImpl mockAppDesc = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = 
mock(OperatorSpec.class);
     MessageStream<SamzaSqlRelMessage> mockStream = new 
MessageStreamImpl<>(mockAppDesc, mockInputOp);
@@ -152,7 +168,7 @@ public class TestProjectTranslator extends 
TranslatorTestBase {
     }).when(mockExpr).execute(eq(executionContext), eq(mockContext), 
eq(dataContext),
         eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), 
eq(result));
     SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
-    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), 
Collections.singletonList("test_field"));
+    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(), 
Collections.singletonList(TEST_FIELD));
     assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), 
Collections.singletonList(mockFieldObj));
 
     // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 07c4ce2..931036e 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -313,30 +313,6 @@ public class TestQueryTranslator {
   }
 
   @Test (expected = SamzaException.class)
-  public void testTranslateStreamTableJoinWithoutJoinOperator() {
-    Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
-    String sql =
-        "Insert into testavro.enrichedPageViewTopic(profileName, pageKey)"
-            + " select p.name as profileName, pv.pageKey"
-            + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"
-            + " where p.id = pv.profileId";
-    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
-    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, 
new MapConfig(config));
-
-    List<String> sqlStmts = fetchSqlFromConfig(config);
-    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
-    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new 
SamzaSqlApplicationConfig(new MapConfig(config),
-        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toList()),
-        
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
-
-    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
-    QueryTranslator translator = new QueryTranslator(streamAppDesc, 
samzaSqlApplicationConfig);
-
-    translator.translate(queryInfo.get(0), streamAppDesc, 0);
-  }
-
-  @Test (expected = SamzaException.class)
   public void testTranslateStreamTableJoinWithFullJoinOperator() {
     Map<String, String> config = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1);
     String sql =
@@ -656,9 +632,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", 
output1PhysicalName);
+    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3", 
output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", 
output2PhysicalName);
+    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3", 
output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -668,9 +644,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_2", 
input3PhysicalName);
+    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_sql_0_join_3", 
input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_2", 
input4PhysicalName);
+    
Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_sql_0_join_3", 
input4PhysicalName);
   }
 
   @Test
@@ -724,9 +700,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", 
output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", 
output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", 
output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", 
output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -736,9 +712,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", 
input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", 
input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", 
input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", 
input4PhysicalName);
   }
 
   @Test
@@ -791,9 +767,9 @@ public class TestQueryTranslator {
 
     Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", 
output1PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", 
output1PhysicalName);
     Assert.assertEquals("kafka", output2System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", 
output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", 
output2PhysicalName);
     Assert.assertEquals("testavro", output3System);
     Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
@@ -803,9 +779,9 @@ public class TestQueryTranslator {
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PAGEVIEW", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_2", 
input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_sql_0_join_3", 
input3PhysicalName);
     Assert.assertEquals("kafka", input4System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_2", 
input4PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-stream_sql_0_join_3", 
input4PhysicalName);
   }
 
   @Test
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index b622e75..ca78af2 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -499,7 +499,6 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(IntStream.range(0, 
numMessages).boxed().collect(Collectors.toList()), outMessages);
   }
 
-  @Ignore
   @Test
   public void testEndToEndNestedRecord() throws SamzaSqlValidatorException {
     int numMessages = 10;
@@ -507,9 +506,12 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
     String sql1 =
-        "Insert into testavro.outputTopic"
-            + " select `phoneNumbers`[0].`kind`"
-            + " from testavro.PROFILE as p";
+        "Insert into testavro.outputTopic (id, bool_value)"
+            // SQL array is one indexed.
+            + " select `phoneNumbers`[1].`kind` as string_value, 
p.address.streetnum.number as id, "
+            + " `phoneNumbers`[1].`kind` = 'Home' as bool_value, 
cast(p.address.zip as bigint) as long_value"
+            + " from testavro.PROFILE as p where p.address.zip > 0 and 
p.address.zip < 100003 ";
+
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
 
@@ -523,6 +525,72 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(numMessages, outMessages.size());
   }
 
+  /**
+   * Testing the getNestedField built in operator
+   * @throws SamzaSqlValidatorException
+   */
+  @Test
+  public void testEndToEndGetNestedFieldOperator() throws 
SamzaSqlValidatorException {
+    int numMessages = 10;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    String sql1 =
+        "Insert into testavro.outputTopic (string_value, id, bool_value, 
double_value, map_values, long_value)"
+            + " select GetNestedField(address, 'streetnum.number') * 
getNestedField(mapValues['key'], 'id') as id, "
+            + " cast(GetNestedField(address, 'streetnum').number * 1.0 as 
double) as double_value, mapValues as map_values, "
+            + " GetNestedField(phoneNumbers[1] ,'kind') = 'Home' as 
bool_value, cast( mapValues['key'].id as bigint) as long_value , "
+            + " GetNestedField(mapValues['key'], 'name') as string_value "
+            + " from testavro.PROFILE as p  where GetNestedField(address, 
'zip') > 0 and GetNestedField(address, 'zip') < 100003";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+    runApplication(config);
+
+    List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
+    // check that the projected values are not null, correct types and good 
values when easy to check.
+    List<GenericRecord> actualResult = outMessages.stream()
+        .map(x -> (GenericRecord) x.getMessage())
+        .filter(x -> (Boolean) x.get("bool_value"))
+        .filter(x -> x.get("string_value") != null && 
!x.get("string_value").toString().isEmpty())
+        .filter(x -> x.get("map_values") instanceof Map)
+        .filter(x -> x.get("id") instanceof Integer)
+        .filter(x -> (Long) x.get("long_value") < 10 && (Long) 
x.get("long_value") >= 0)
+        .filter(x -> x.get("double_value") instanceof Double && (Double) 
x.get("double_value") >= 1234.0)
+        .collect(Collectors.toList());
+    Assert.assertEquals(
+        "Wrong results size, check the test condition against the Actual 
outputs -> " + outMessages.toString(),
+        numMessages, actualResult.size());
+  }
+
+
+  @Test
+  public void testEndToEndNestedRecordProjectFilter() throws 
SamzaSqlValidatorException {
+    int numMessages = 10;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+
+    String sql1 = " Insert into testavro.PROFILE1 select 
(p.address.streetnum.number * p.address.zip) as id , "
+        + " p.address, `phoneNumbers`[1].`kind` = 'Home' as selfEmployed, "
+        + " MAP[cast(id as varchar), `phoneNumbers`[1].number] as mapValues, 
phoneNumbers, "
+        + " cast(companyId as varchar) || name ||`phoneNumbers`[1].number || 
'concat' as name , "
+        + " 100 * ((companyId + 122) / 3 ) as companyId "
+        + " from testavro.PROFILE as p where p.address.zip > 0 "
+        + " and p.address.zip < 100003 and p.address.streetnum.number > 0 ";
+
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
+    Assert.assertEquals(numMessages, outMessages.size());
+  }
+
   @Test
   public void testEndToEndFlattenWithUdf() throws Exception {
     int numMessages = 20;
@@ -647,7 +715,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.PROFILE1(id, address) "
-        + "select id, BuildOutputRecord('key', GetNestedField(address, 'zip')) 
as address from testavro.PROFILE";
+        + "select id, BuildOutputRecord('key', p.address.zip) as address from 
testavro.PROFILE as p";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     runApplication(new MapConfig(staticConfigs));
@@ -724,7 +792,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
             + "       p.name as profileName, p.address as profileAddress "
             + "from testavro.PROFILE.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
-            + " on p.id = pv.profileId";
+            + " on p.id = pv.profileId where p.name = 'Mike' or p.name is not 
null";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
@@ -745,7 +813,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testEndToEndStreamTableInnerJoinWithPrimaryKey() throws 
Exception {
+  public void testEndToEndStreamTableInnerJoinWithPrimaryKey() {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index c41038d..bd541c6 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -138,7 +138,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
             + "       p.name as profileName, p.address as profileAddress "
             + "from testRemoteStore.Profile.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId";
+            + " on p.__key__ = pv.profileId where p.name is not null";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
@@ -336,7 +336,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
             + "       p.name as profileName, p.address as profileAddress "
             + "from testRemoteStore.Profile.`$table` as p "
             + "right join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId";
+            + " on p.__key__ = pv.profileId where p.name is null or  p.name <> 
'0'";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
@@ -400,6 +400,82 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
 
 
   @Test
+  public void testSourceEndToEndWithFilterAndInnerJoin() throws 
SamzaSqlValidatorException {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql = "Insert into testavro.enrichedPageViewTopic "
+        + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 
'N/A') as companyName,"
+        + "       p.name as profileName, p.address as profileAddress "
+        + "from testavro.PAGEVIEW as pv  "
+        + "join testRemoteStore.Profile.`$table` as p "
+        + " on p.__key__ = pv.profileId"
+        + "  where p.name <> 'Mike' ";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
"," + (
+            ((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null"
+                : ((GenericRecord) 
x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    List<String> expectedOutMessages = 
TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages)
+        .stream()
+        .filter(x -> !x.contains("Mike"))
+        .collect(Collectors.toList());
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
+  public void testSourceEndToEndWithFilterAndLeftOuterJoin() throws 
SamzaSqlValidatorException {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs =
+        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), 
numMessages, true);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql = "Insert into testavro.enrichedPageViewTopic "
+        + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 
'N/A') as companyName,"
+        + "       p.name as profileName, p.address as profileAddress "
+        + "from testavro.PAGEVIEW as pv  "
+        + " LEFT Join testRemoteStore.Profile.`$table` as p "
+        + " on  pv.profileId + 1 - (2/2) = p.__key__ "
+        + "  where p.name <> 'Mary' or p.name is null";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + 
"," + (
+            ((GenericRecord) x.getMessage()).get("profileName") == null ? 
"null"
+                : ((GenericRecord) 
x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    List<String> expectedOutMessages =
+        
TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages)
+            .stream()
+            .filter(x -> !x.contains("Mary"))
+            .collect(Collectors.toList());
+
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
   public void testSameJoinTargetSinkEndToEndRightOuterJoin() throws 
SamzaSqlValidatorException {
     int numMessages = 21;
 

Reply via email to