This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1908622  Samza-2253: Upgrading calcite to 1.19 (#1085)
1908622 is described below

commit 190862207f6f6f6959725884a340762329097d1d
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Tue Jun 18 15:06:57 2019 -0700

    Samza-2253: Upgrading calcite to 1.19 (#1085)
    
    * Upgrading calcite to 1.19
    
    * Fixing the flatten testcase and adding another test
    
    * minor fixup
    
    * Removing a flaky test
---
 gradle/dependency-versions.gradle                  |   2 +-
 .../apache/samza/sql/data/RexToJavaCompiler.java   |  51 +++----
 .../java/org/apache/samza/sql/fn/FlattenUdf.java   |   2 +-
 .../samza/sql/translator/ProjectTranslator.java    |  73 +++++-----
 .../sql/translator/TestProjectTranslator.java      | 155 ---------------------
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  |  24 +++-
 6 files changed, 88 insertions(+), 219 deletions(-)

diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 9279051..6fe812f 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -19,7 +19,7 @@
  ext {
   apacheCommonsCollections4Version = "4.0"
   avroVersion = "1.7.1"
-  calciteVersion = "1.17.0"
+  calciteVersion = "1.19.0"
   commonsCliVersion = "1.2"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
index 37ce229..bc881d3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.data;
 
@@ -29,7 +29,6 @@ import java.lang.reflect.Type;
 import java.util.List;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.enumerable.JavaRowFormat;
-import org.apache.calcite.adapter.enumerable.PhysType;
 import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
 import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -47,6 +46,8 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.util.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
@@ -117,17 +118,17 @@ public class RexToJavaCompiler {
     final ParameterExpression root = DataContext.ROOT;
     final ParameterExpression inputValues = 
Expressions.parameter(Object[].class, "inputValues");
     final ParameterExpression outputValues = 
Expressions.parameter(Object[].class, "outputValues");
-    final JavaTypeFactoryImpl javaTypeFactory = new 
SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+    final JavaTypeFactoryImpl javaTypeFactory =
+        new 
SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
 
     // public void execute(Object[] inputValues, Object[] outputValues)
     final RexToLixTranslator.InputGetter inputGetter = new 
RexToLixTranslator.InputGetterImpl(ImmutableList.of(
-        Pair.of(
-            Expressions.variable(Object[].class, "inputValues"),
+        Pair.of(Expressions.variable(Object[].class, "inputValues"),
             PhysTypeImpl.of(javaTypeFactory, inputRowType, 
JavaRowFormat.ARRAY, false))));
 
     final List<org.apache.calcite.linq4j.tree.Expression> list =
-        RexToLixTranslator.translateProjects(program, javaTypeFactory, 
builder, null, DataContext.ROOT, inputGetter,
-            null);
+        RexToLixTranslator.translateProjects(program, javaTypeFactory, 
SqlConformanceEnum.DEFAULT, builder, null,
+            DataContext.ROOT, inputGetter, null);
     for (int i = 0; i < list.size(); i++) {
       builder.add(Expressions.statement(
           Expressions.assign(Expressions.arrayIndex(outputValues, 
Expressions.constant(i)), list.get(i))));
@@ -211,8 +212,8 @@ public class RexToJavaCompiler {
    * Represents the methods in the class {@link Expression}
    */
   public enum SamzaBuiltInMethod {
-    EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute", 
SamzaSqlExecutionContext.class,
-        Context.class, DataContext.class, Object[].class, Object[].class);
+    EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute", 
SamzaSqlExecutionContext.class, Context.class,
+        DataContext.class, Object[].class, Object[].class);
 
     public final Method method;
 
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java 
b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
index a24a992..1909fd4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
@@ -36,6 +36,6 @@ public class FlattenUdf implements ScalarUdf {
 
   @SamzaSqlUdfMethod(params = SamzaSqlFieldType.ARRAY)
   public Object execute(List value) {
-    return value != null && !value.isEmpty() ? value.get(0) : value;
+    return value;
   }
 }
\ No newline at end of file
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 8bbe84a..5b0dcc7 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.translator;
 
@@ -26,10 +26,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.samza.SamzaException;
@@ -57,6 +57,7 @@ class ProjectTranslator {
   private static final Logger LOG = 
LoggerFactory.getLogger(ProjectTranslator.class);
   //private transient int messageIndex = 0;
   private final int queryId;
+
   ProjectTranslator(int queryId) {
     this.queryId = queryId;
   }
@@ -92,7 +93,8 @@ class ProjectTranslator {
     @Override
     public void init(Context context) {
       this.context = context;
-      this.translatorContext = ((SamzaSqlApplicationContext) 
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+      this.translatorContext =
+          ((SamzaSqlApplicationContext) 
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
       this.project = (Project) this.translatorContext.getRelNode(projectId);
       this.expr = 
this.translatorContext.getExpressionCompiler().compile(project.getInputs(), 
project.getProjects());
       ContainerContext containerContext = context.getContainerContext();
@@ -137,7 +139,6 @@ class ProjectTranslator {
       outputEvents.inc();
       processingTime.update(Duration.between(arrivalTime, 
outputTime).toMillis());
     }
-
   }
 
   private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer 
flattenIndex,
@@ -147,13 +148,14 @@ class ProjectTranslator {
       if (field != null && field instanceof List) {
         List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
         SamzaSqlRelMsgMetadata messageMetadata = 
message.getSamzaSqlRelMsgMetadata();
-        SamzaSqlRelMsgMetadata newMetadata = new 
SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(),
-            messageMetadata.getarrivalTime(), messageMetadata.getscanTime(), 
true);
+        SamzaSqlRelMsgMetadata newMetadata =
+            new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(), 
messageMetadata.getarrivalTime(),
+                messageMetadata.getscanTime(), true);
         for (Object fieldValue : (List) field) {
           List<Object> newValues = new 
ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
           newValues.set(flattenIndex, Collections.singletonList(fieldValue));
-          outMessages.add(new 
SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues,
-              newMetadata));
+          outMessages.add(
+              new 
SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues, 
newMetadata));
           newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), 
newMetadata.getarrivalTime(),
               newMetadata.getscanTime(), false);
         }
@@ -170,14 +172,19 @@ class ProjectTranslator {
         && ((RexCall) rexNode).op.getName().equalsIgnoreCase("flatten");
   }
 
-  private Integer getProjectIndex(RexNode rexNode) {
-    return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex();
-  }
-
   void translate(final Project project, final String logicalOpId, final 
TranslatorContext context) {
     MessageStream<SamzaSqlRelMessage> messageStream = 
context.getMessageStream(project.getInput().getId());
-    List<Integer> flattenProjects =
-        
project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
+
+    final int projectId = project.getId();
+
+    MessageStream<SamzaSqlRelMessage> outputStream =
+        messageStream.map(new ProjectMapFunction(projectId, queryId, 
logicalOpId));
+
+    List<RexNode> projects = project.getProjects();
+    List<Integer> flattenProjects = IntStream.range(0, projects.size())
+        .filter(i -> this.isFlatten(projects.get(i)))
+        .boxed()
+        .collect(Collectors.toList());
 
     if (flattenProjects.size() > 0) {
       if (flattenProjects.size() > 1) {
@@ -185,13 +192,9 @@ class ProjectTranslator {
         LOG.error(msg);
         throw new SamzaException(msg);
       }
-      messageStream = translateFlatten(flattenProjects.get(0), messageStream);
+      outputStream = translateFlatten(flattenProjects.get(0), outputStream);
     }
 
-    final int projectId = project.getId();
-
-    MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new 
ProjectMapFunction(projectId, queryId, logicalOpId));
-
     context.registerMessageStream(project.getId(), outputStream);
     context.registerRelNode(project.getId(), project);
   }
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 0129284..3a2b943 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -171,159 +171,4 @@ public class TestProjectTranslator extends 
TranslatorTestBase {
     assertEquals(1, 
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
 
   }
-
-
-  @Test
-  public void testTranslateWithFlatten() throws IOException, 
ClassNotFoundException {
-    // setup mock values to the constructor of ProjectTranslator
-    LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
-    TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
-    Context mockContext = mock(Context.class);
-    ContainerContext mockContainerContext = mock(ContainerContext.class);
-    TestMetricsRegistryImpl testMetricsRegistryImpl = new 
TestMetricsRegistryImpl();
-
-    RelNode mockInput = mock(RelNode.class);
-    List<RelNode> inputs = new ArrayList<>();
-    inputs.add(mockInput);
-    when(mockInput.getId()).thenReturn(1);
-    when(mockProject.getId()).thenReturn(2);
-    when(mockProject.getInputs()).thenReturn(inputs);
-    when(mockProject.getInput()).thenReturn(mockInput);
-    RelDataType mockRowType = mock(RelDataType.class);
-    when(mockRowType.getFieldCount()).thenReturn(1);
-    when(mockProject.getRowType()).thenReturn(mockRowType);
-    RexNode mockRexField = mock(RexNode.class);
-    List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
-    namedProjects.add(Pair.of(mockRexField, "test_field"));
-    when(mockProject.getNamedProjects()).thenReturn(namedProjects);
-    List<RexNode> flattenProjects = new ArrayList<>();
-    RexCall mockFlattenProject = mock(RexCall.class);
-    SqlUserDefinedFunction sqlFlattenUdf = mock(SqlUserDefinedFunction.class);
-    when(sqlFlattenUdf.getName()).thenReturn("flatten");
-    List<RexNode> flattenUdfOperands = new ArrayList<>();
-    RexInputRef rexInputRef = mock(RexInputRef.class);
-    when(rexInputRef.getIndex()).thenReturn(0);
-    flattenUdfOperands.add(rexInputRef);
-    when(mockFlattenProject.getOperands()).thenReturn(flattenUdfOperands);
-    Whitebox.setInternalState(mockFlattenProject, "op", sqlFlattenUdf);
-    flattenProjects.add(mockFlattenProject);
-    when(mockProject.getProjects()).thenReturn(flattenProjects);
-
-    StreamApplicationDescriptorImpl mockAppDesc = 
mock(StreamApplicationDescriptorImpl.class);
-    OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new 
OperatorSpec(OperatorSpec.OpCode.INPUT, "1") {
-
-      @Override
-      public WatermarkFunction getWatermarkFn() {
-        return null;
-      }
-
-      @Override
-      public ScheduledFunction getScheduledFn() {
-        return null;
-      }
-    };
-
-    MessageStream<SamzaSqlRelMessage> mockStream = new 
MessageStreamImpl<>(mockAppDesc, mockInputOp);
-    when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
-    
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2),
 any(MessageStream.class));
-    RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
-    
when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
-    Expression mockExpr = mock(Expression.class);
-    when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
-
-    // Apply translate() method to verify that we are getting the correct map 
operator constructed
-    ProjectTranslator projectTranslator = new ProjectTranslator(1);
-    projectTranslator.translate(mockProject, LOGICAL_OP_ID, 
mockTranslatorContext);
-    // make sure that context has been registered with LogicFilter and output 
message streams
-    verify(mockTranslatorContext, times(1)).registerRelNode(2, mockProject);
-    verify(mockTranslatorContext, times(1)).registerMessageStream(2, 
this.getRegisteredMessageStream(2));
-    when(mockTranslatorContext.getRelNode(2)).thenReturn(mockProject);
-    
when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
-
-
-    Collection<OperatorSpec>
-        nextOps = ((OperatorSpec) Whitebox.getInternalState(mockStream, 
"operatorSpec")).getRegisteredOperatorSpecs();
-    StreamOperatorSpec flattenOp = (StreamOperatorSpec) 
nextOps.iterator().next();
-    assertNotNull(flattenOp);
-    Object testObj = new Object();
-    SamzaSqlRelMessage mockMsg = new SamzaSqlRelMessage(new 
ArrayList<String>() {{
-      this.add("test_field_no1");
-    }}, new ArrayList<Object>() {{
-      this.add(testObj);
-    }}, new SamzaSqlRelMsgMetadata("", "", ""));
-    Collection<SamzaSqlRelMessage> flattenedMsgs = 
flattenOp.getTransformFn().apply(mockMsg);
-    assertTrue(flattenedMsgs.size() == 1);
-    assertTrue(flattenedMsgs.stream().anyMatch(s -> 
s.getSamzaSqlRelRecord().getFieldValues().get(0).equals(testObj)));
-    List<Integer> testList = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      testList.add(new Integer(i));
-    }
-    mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{
-      this.add("test_list_field1");
-    }}, new ArrayList<Object>() {{
-      this.add(testList);
-    }}, new SamzaSqlRelMsgMetadata("", "", ""));
-    flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
-    assertTrue(flattenedMsgs.size() == 10);
-    List<Integer> actualList = flattenedMsgs.stream()
-        .map(m -> ((List<Integer>) 
m.getSamzaSqlRelRecord().getFieldValues().get(0)).get(0))
-        .collect(ArrayList::new, (c, a) -> c.add(a), (c1, c2) -> 
c1.addAll(c2));
-    assertEquals(testList, actualList);
-
-    StreamOperatorSpec projectSpec = (StreamOperatorSpec) 
Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
-    assertNotNull(projectSpec);
-    assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
-
-    // Verify that the describe() method will establish the context for the 
map function
-    when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
-    
when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
-    Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
-    mockContexts.put(1, mockTranslatorContext);
-    when(mockContext.getApplicationTaskContext()).thenReturn(new 
SamzaSqlApplicationContext(mockContexts));
-    projectSpec.getTransformFn().init(mockContext);
-    MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, 
"mapFn");
-    assertNotNull(mapFn);
-    assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn, 
"translatorContext"));
-    assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
-    assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
-    // Verify TestMetricsRegistryImpl works with Project
-    assertEquals(1, testMetricsRegistryImpl.getGauges().size());
-    assertEquals(2, 
testMetricsRegistryImpl.getGauges().get(LOGICAL_OP_ID).size());
-    assertEquals(1, testMetricsRegistryImpl.getCounters().size());
-    assertEquals(2, 
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
-    assertEquals(0, 
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
-    assertEquals(0, 
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
-    // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
-    for (SamzaSqlRelMessage message : flattenedMsgs) {
-      mapFn.apply(message);
-    }
-    assertEquals(1, 
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
-    assertEquals(10, 
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
-
-    // Calling mapFn.apply() to verify the filter function is correctly 
applied to the input message
-    SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new 
ArrayList<>(), new ArrayList<>(),
-        new SamzaSqlRelMsgMetadata("", "", ""));
-    SamzaSqlExecutionContext executionContext = 
mock(SamzaSqlExecutionContext.class);
-    DataContext dataContext = mock(DataContext.class);
-    
when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
-    when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
-    Object[] result = new Object[1];
-    final Object mockFieldObj = new Object();
-
-    doAnswer( invocation -> {
-      Object[] retValue = invocation.getArgumentAt(4, Object[].class);
-      retValue[0] = mockFieldObj;
-      return null;
-    }).when(mockExpr).execute(eq(executionContext), eq(mockContext), 
eq(dataContext),
-        eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()), 
eq(result));
-    SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
-    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
-        new ArrayList<String>() {{
-          this.add("test_field");
-        }});
-    assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new 
ArrayList<Object>() {{
-      this.add(mockFieldObj);
-    }});
-
-  }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 056acca..0c66d2f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -309,8 +309,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     runApplication(new MapConfig(staticConfigs));
 
     List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
-    // BUG Compound boolean checks dont work in calcite
-    Assert.assertEquals(0, outMessages.size());
+    Assert.assertEquals(numMessages / 2, outMessages.size());
   }
 
   @Test
@@ -418,6 +417,27 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   }
 
   @Test
+  public void testEndToEndFlattenWithUdf() throws Exception {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 =
+        "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id)) 
as id from testavro.SIMPLE1";
+    List<String> sqlStmts = Collections.singletonList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    List<OutgoingMessageEnvelope> outMessages = new 
ArrayList<>(TestAvroSystemFactory.messages);
+
+    int expectedMessages = 0;
+    // Flatten de-normalizes the data. So there is separate record for each 
entry in the array.
+    for (int index = 1; index < numMessages; index++) {
+      expectedMessages = expectedMessages + Math.max(1, index);
+    }
+    Assert.assertEquals(expectedMessages, outMessages.size());
+  }
+
+  @Test
   public void testEndToEndSubQuery() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();

Reply via email to