This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 1908622 Samza-2253: Upgrading calcite to 1.19 (#1085)
1908622 is described below
commit 190862207f6f6f6959725884a340762329097d1d
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Tue Jun 18 15:06:57 2019 -0700
Samza-2253: Upgrading calcite to 1.19 (#1085)
* Upgrading calcite to 1.19
* Fixing the flatten testcase and adding another test
* minor fixup
* Removing a flaky test
---
gradle/dependency-versions.gradle | 2 +-
.../apache/samza/sql/data/RexToJavaCompiler.java | 51 +++----
.../java/org/apache/samza/sql/fn/FlattenUdf.java | 2 +-
.../samza/sql/translator/ProjectTranslator.java | 73 +++++-----
.../sql/translator/TestProjectTranslator.java | 155 ---------------------
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 24 +++-
6 files changed, 88 insertions(+), 219 deletions(-)
diff --git a/gradle/dependency-versions.gradle
b/gradle/dependency-versions.gradle
index 9279051..6fe812f 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -19,7 +19,7 @@
ext {
apacheCommonsCollections4Version = "4.0"
avroVersion = "1.7.1"
- calciteVersion = "1.17.0"
+ calciteVersion = "1.19.0"
commonsCliVersion = "1.2"
commonsCodecVersion = "1.9"
commonsCollectionVersion = "3.2.1"
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
index 37ce229..bc881d3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/RexToJavaCompiler.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.data;
@@ -29,7 +29,6 @@ import java.lang.reflect.Type;
import java.util.List;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
-import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
@@ -47,6 +46,8 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.util.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
@@ -117,17 +118,17 @@ public class RexToJavaCompiler {
final ParameterExpression root = DataContext.ROOT;
final ParameterExpression inputValues =
Expressions.parameter(Object[].class, "inputValues");
final ParameterExpression outputValues =
Expressions.parameter(Object[].class, "outputValues");
- final JavaTypeFactoryImpl javaTypeFactory = new
SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+ final JavaTypeFactoryImpl javaTypeFactory =
+ new
SamzaSqlJavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
// public void execute(Object[] inputValues, Object[] outputValues)
final RexToLixTranslator.InputGetter inputGetter = new
RexToLixTranslator.InputGetterImpl(ImmutableList.of(
- Pair.of(
- Expressions.variable(Object[].class, "inputValues"),
+ Pair.of(Expressions.variable(Object[].class, "inputValues"),
PhysTypeImpl.of(javaTypeFactory, inputRowType,
JavaRowFormat.ARRAY, false))));
final List<org.apache.calcite.linq4j.tree.Expression> list =
- RexToLixTranslator.translateProjects(program, javaTypeFactory,
builder, null, DataContext.ROOT, inputGetter,
- null);
+ RexToLixTranslator.translateProjects(program, javaTypeFactory,
SqlConformanceEnum.DEFAULT, builder, null,
+ DataContext.ROOT, inputGetter, null);
for (int i = 0; i < list.size(); i++) {
builder.add(Expressions.statement(
Expressions.assign(Expressions.arrayIndex(outputValues,
Expressions.constant(i)), list.get(i))));
@@ -211,8 +212,8 @@ public class RexToJavaCompiler {
* Represents the methods in the class {@link Expression}
*/
public enum SamzaBuiltInMethod {
- EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute",
SamzaSqlExecutionContext.class,
- Context.class, DataContext.class, Object[].class, Object[].class);
+ EXPR_EXECUTE2(org.apache.samza.sql.data.Expression.class, "execute",
SamzaSqlExecutionContext.class, Context.class,
+ DataContext.class, Object[].class, Object[].class);
public final Method method;
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
index a24a992..1909fd4 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/FlattenUdf.java
@@ -36,6 +36,6 @@ public class FlattenUdf implements ScalarUdf {
@SamzaSqlUdfMethod(params = SamzaSqlFieldType.ARRAY)
public Object execute(List value) {
- return value != null && !value.isEmpty() ? value.get(0) : value;
+ return value;
}
}
\ No newline at end of file
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 8bbe84a..5b0dcc7 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.translator;
@@ -26,10 +26,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.samza.SamzaException;
@@ -57,6 +57,7 @@ class ProjectTranslator {
private static final Logger LOG =
LoggerFactory.getLogger(ProjectTranslator.class);
//private transient int messageIndex = 0;
private final int queryId;
+
ProjectTranslator(int queryId) {
this.queryId = queryId;
}
@@ -92,7 +93,8 @@ class ProjectTranslator {
@Override
public void init(Context context) {
this.context = context;
- this.translatorContext = ((SamzaSqlApplicationContext)
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+ this.translatorContext =
+ ((SamzaSqlApplicationContext)
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.project = (Project) this.translatorContext.getRelNode(projectId);
this.expr =
this.translatorContext.getExpressionCompiler().compile(project.getInputs(),
project.getProjects());
ContainerContext containerContext = context.getContainerContext();
@@ -137,7 +139,6 @@ class ProjectTranslator {
outputEvents.inc();
processingTime.update(Duration.between(arrivalTime,
outputTime).toMillis());
}
-
}
private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer
flattenIndex,
@@ -147,13 +148,14 @@ class ProjectTranslator {
if (field != null && field instanceof List) {
List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
SamzaSqlRelMsgMetadata messageMetadata =
message.getSamzaSqlRelMsgMetadata();
- SamzaSqlRelMsgMetadata newMetadata = new
SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(),
- messageMetadata.getarrivalTime(), messageMetadata.getscanTime(),
true);
+ SamzaSqlRelMsgMetadata newMetadata =
+ new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(),
messageMetadata.getarrivalTime(),
+ messageMetadata.getscanTime(), true);
for (Object fieldValue : (List) field) {
List<Object> newValues = new
ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
newValues.set(flattenIndex, Collections.singletonList(fieldValue));
- outMessages.add(new
SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues,
- newMetadata));
+ outMessages.add(
+ new
SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues,
newMetadata));
newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(),
newMetadata.getarrivalTime(),
newMetadata.getscanTime(), false);
}
@@ -170,14 +172,19 @@ class ProjectTranslator {
&& ((RexCall) rexNode).op.getName().equalsIgnoreCase("flatten");
}
- private Integer getProjectIndex(RexNode rexNode) {
- return ((RexInputRef) ((RexCall) rexNode).getOperands().get(0)).getIndex();
- }
-
void translate(final Project project, final String logicalOpId, final
TranslatorContext context) {
MessageStream<SamzaSqlRelMessage> messageStream =
context.getMessageStream(project.getInput().getId());
- List<Integer> flattenProjects =
-
project.getProjects().stream().filter(this::isFlatten).map(this::getProjectIndex).collect(Collectors.toList());
+
+ final int projectId = project.getId();
+
+ MessageStream<SamzaSqlRelMessage> outputStream =
+ messageStream.map(new ProjectMapFunction(projectId, queryId,
logicalOpId));
+
+ List<RexNode> projects = project.getProjects();
+ List<Integer> flattenProjects = IntStream.range(0, projects.size())
+ .filter(i -> this.isFlatten(projects.get(i)))
+ .boxed()
+ .collect(Collectors.toList());
if (flattenProjects.size() > 0) {
if (flattenProjects.size() > 1) {
@@ -185,13 +192,9 @@ class ProjectTranslator {
LOG.error(msg);
throw new SamzaException(msg);
}
- messageStream = translateFlatten(flattenProjects.get(0), messageStream);
+ outputStream = translateFlatten(flattenProjects.get(0), outputStream);
}
- final int projectId = project.getId();
-
- MessageStream<SamzaSqlRelMessage> outputStream = messageStream.map(new
ProjectMapFunction(projectId, queryId, logicalOpId));
-
context.registerMessageStream(project.getId(), outputStream);
context.registerRelNode(project.getId(), project);
}
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 0129284..3a2b943 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -171,159 +171,4 @@ public class TestProjectTranslator extends
TranslatorTestBase {
assertEquals(1,
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
}
-
-
- @Test
- public void testTranslateWithFlatten() throws IOException,
ClassNotFoundException {
- // setup mock values to the constructor of ProjectTranslator
- LogicalProject mockProject = PowerMockito.mock(LogicalProject.class);
- TranslatorContext mockTranslatorContext = mock(TranslatorContext.class);
- Context mockContext = mock(Context.class);
- ContainerContext mockContainerContext = mock(ContainerContext.class);
- TestMetricsRegistryImpl testMetricsRegistryImpl = new
TestMetricsRegistryImpl();
-
- RelNode mockInput = mock(RelNode.class);
- List<RelNode> inputs = new ArrayList<>();
- inputs.add(mockInput);
- when(mockInput.getId()).thenReturn(1);
- when(mockProject.getId()).thenReturn(2);
- when(mockProject.getInputs()).thenReturn(inputs);
- when(mockProject.getInput()).thenReturn(mockInput);
- RelDataType mockRowType = mock(RelDataType.class);
- when(mockRowType.getFieldCount()).thenReturn(1);
- when(mockProject.getRowType()).thenReturn(mockRowType);
- RexNode mockRexField = mock(RexNode.class);
- List<Pair<RexNode, String>> namedProjects = new ArrayList<>();
- namedProjects.add(Pair.of(mockRexField, "test_field"));
- when(mockProject.getNamedProjects()).thenReturn(namedProjects);
- List<RexNode> flattenProjects = new ArrayList<>();
- RexCall mockFlattenProject = mock(RexCall.class);
- SqlUserDefinedFunction sqlFlattenUdf = mock(SqlUserDefinedFunction.class);
- when(sqlFlattenUdf.getName()).thenReturn("flatten");
- List<RexNode> flattenUdfOperands = new ArrayList<>();
- RexInputRef rexInputRef = mock(RexInputRef.class);
- when(rexInputRef.getIndex()).thenReturn(0);
- flattenUdfOperands.add(rexInputRef);
- when(mockFlattenProject.getOperands()).thenReturn(flattenUdfOperands);
- Whitebox.setInternalState(mockFlattenProject, "op", sqlFlattenUdf);
- flattenProjects.add(mockFlattenProject);
- when(mockProject.getProjects()).thenReturn(flattenProjects);
-
- StreamApplicationDescriptorImpl mockAppDesc =
mock(StreamApplicationDescriptorImpl.class);
- OperatorSpec<Object, SamzaSqlRelMessage> mockInputOp = new
OperatorSpec(OperatorSpec.OpCode.INPUT, "1") {
-
- @Override
- public WatermarkFunction getWatermarkFn() {
- return null;
- }
-
- @Override
- public ScheduledFunction getScheduledFn() {
- return null;
- }
- };
-
- MessageStream<SamzaSqlRelMessage> mockStream = new
MessageStreamImpl<>(mockAppDesc, mockInputOp);
- when(mockTranslatorContext.getMessageStream(eq(1))).thenReturn(mockStream);
-
doAnswer(this.getRegisterMessageStreamAnswer()).when(mockTranslatorContext).registerMessageStream(eq(2),
any(MessageStream.class));
- RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class);
-
when(mockTranslatorContext.getExpressionCompiler()).thenReturn(mockCompiler);
- Expression mockExpr = mock(Expression.class);
- when(mockCompiler.compile(any(), any())).thenReturn(mockExpr);
-
- // Apply translate() method to verify that we are getting the correct map
operator constructed
- ProjectTranslator projectTranslator = new ProjectTranslator(1);
- projectTranslator.translate(mockProject, LOGICAL_OP_ID,
mockTranslatorContext);
- // make sure that context has been registered with LogicFilter and output
message streams
- verify(mockTranslatorContext, times(1)).registerRelNode(2, mockProject);
- verify(mockTranslatorContext, times(1)).registerMessageStream(2,
this.getRegisteredMessageStream(2));
- when(mockTranslatorContext.getRelNode(2)).thenReturn(mockProject);
-
when(mockTranslatorContext.getMessageStream(2)).thenReturn(this.getRegisteredMessageStream(2));
-
-
- Collection<OperatorSpec>
- nextOps = ((OperatorSpec) Whitebox.getInternalState(mockStream,
"operatorSpec")).getRegisteredOperatorSpecs();
- StreamOperatorSpec flattenOp = (StreamOperatorSpec)
nextOps.iterator().next();
- assertNotNull(flattenOp);
- Object testObj = new Object();
- SamzaSqlRelMessage mockMsg = new SamzaSqlRelMessage(new
ArrayList<String>() {{
- this.add("test_field_no1");
- }}, new ArrayList<Object>() {{
- this.add(testObj);
- }}, new SamzaSqlRelMsgMetadata("", "", ""));
- Collection<SamzaSqlRelMessage> flattenedMsgs =
flattenOp.getTransformFn().apply(mockMsg);
- assertTrue(flattenedMsgs.size() == 1);
- assertTrue(flattenedMsgs.stream().anyMatch(s ->
s.getSamzaSqlRelRecord().getFieldValues().get(0).equals(testObj)));
- List<Integer> testList = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- testList.add(new Integer(i));
- }
- mockMsg = new SamzaSqlRelMessage(new ArrayList<String>() {{
- this.add("test_list_field1");
- }}, new ArrayList<Object>() {{
- this.add(testList);
- }}, new SamzaSqlRelMsgMetadata("", "", ""));
- flattenedMsgs = flattenOp.getTransformFn().apply(mockMsg);
- assertTrue(flattenedMsgs.size() == 10);
- List<Integer> actualList = flattenedMsgs.stream()
- .map(m -> ((List<Integer>)
m.getSamzaSqlRelRecord().getFieldValues().get(0)).get(0))
- .collect(ArrayList::new, (c, a) -> c.add(a), (c1, c2) ->
c1.addAll(c2));
- assertEquals(testList, actualList);
-
- StreamOperatorSpec projectSpec = (StreamOperatorSpec)
Whitebox.getInternalState(this.getRegisteredMessageStream(2), "operatorSpec");
- assertNotNull(projectSpec);
- assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
-
- // Verify that the describe() method will establish the context for the
map function
- when(mockContext.getContainerContext()).thenReturn(mockContainerContext);
-
when(mockContainerContext.getContainerMetricsRegistry()).thenReturn(testMetricsRegistryImpl);
- Map<Integer, TranslatorContext> mockContexts= new HashMap<>();
- mockContexts.put(1, mockTranslatorContext);
- when(mockContext.getApplicationTaskContext()).thenReturn(new
SamzaSqlApplicationContext(mockContexts));
- projectSpec.getTransformFn().init(mockContext);
- MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec,
"mapFn");
- assertNotNull(mapFn);
- assertEquals(mockTranslatorContext, Whitebox.getInternalState(mapFn,
"translatorContext"));
- assertEquals(mockProject, Whitebox.getInternalState(mapFn, "project"));
- assertEquals(mockExpr, Whitebox.getInternalState(mapFn, "expr"));
- // Verify TestMetricsRegistryImpl works with Project
- assertEquals(1, testMetricsRegistryImpl.getGauges().size());
- assertEquals(2,
testMetricsRegistryImpl.getGauges().get(LOGICAL_OP_ID).size());
- assertEquals(1, testMetricsRegistryImpl.getCounters().size());
- assertEquals(2,
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).size());
- assertEquals(0,
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
- assertEquals(0,
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
- // Verify mapFn.apply() updates the TestMetricsRegistryImpl metrics
- for (SamzaSqlRelMessage message : flattenedMsgs) {
- mapFn.apply(message);
- }
- assertEquals(1,
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(0).getCount());
- assertEquals(10,
testMetricsRegistryImpl.getCounters().get(LOGICAL_OP_ID).get(1).getCount());
-
- // Calling mapFn.apply() to verify the filter function is correctly
applied to the input message
- SamzaSqlRelMessage mockInputMsg = new SamzaSqlRelMessage(new
ArrayList<>(), new ArrayList<>(),
- new SamzaSqlRelMsgMetadata("", "", ""));
- SamzaSqlExecutionContext executionContext =
mock(SamzaSqlExecutionContext.class);
- DataContext dataContext = mock(DataContext.class);
-
when(mockTranslatorContext.getExecutionContext()).thenReturn(executionContext);
- when(mockTranslatorContext.getDataContext()).thenReturn(dataContext);
- Object[] result = new Object[1];
- final Object mockFieldObj = new Object();
-
- doAnswer( invocation -> {
- Object[] retValue = invocation.getArgumentAt(4, Object[].class);
- retValue[0] = mockFieldObj;
- return null;
- }).when(mockExpr).execute(eq(executionContext), eq(mockContext),
eq(dataContext),
- eq(mockInputMsg.getSamzaSqlRelRecord().getFieldValues().toArray()),
eq(result));
- SamzaSqlRelMessage retMsg = (SamzaSqlRelMessage) mapFn.apply(mockInputMsg);
- assertEquals(retMsg.getSamzaSqlRelRecord().getFieldNames(),
- new ArrayList<String>() {{
- this.add("test_field");
- }});
- assertEquals(retMsg.getSamzaSqlRelRecord().getFieldValues(), new
ArrayList<Object>() {{
- this.add(mockFieldObj);
- }});
-
- }
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 056acca..0c66d2f 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -309,8 +309,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
runApplication(new MapConfig(staticConfigs));
List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
- // BUG Compound boolean checks dont work in calcite
- Assert.assertEquals(0, outMessages.size());
+ Assert.assertEquals(numMessages / 2, outMessages.size());
}
@Test
@@ -418,6 +417,27 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
}
@Test
+ public void testEndToEndFlattenWithUdf() throws Exception {
+ int numMessages = 20;
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String sql1 =
+ "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id))
as id from testavro.SIMPLE1";
+ List<String> sqlStmts = Collections.singletonList(sql1);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+ runApplication(new MapConfig(staticConfigs));
+
+ List<OutgoingMessageEnvelope> outMessages = new
ArrayList<>(TestAvroSystemFactory.messages);
+
+ int expectedMessages = 0;
+ // Flatten de-normalizes the data. So there is separate record for each
entry in the array.
+ for (int index = 1; index < numMessages; index++) {
+ expectedMessages = expectedMessages + Math.max(1, index);
+ }
+ Assert.assertEquals(expectedMessages, outMessages.size());
+ }
+
+ @Test
public void testEndToEndSubQuery() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();