This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3b8dc03 Propogating the system messages to the stream. (#937)
3b8dc03 is described below
commit 3b8dc03b22bf37aeee192190daf26653de5d61ea
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Mon Mar 4 16:21:51 2019 -0800
Propogating the system messages to the stream. (#937)
* Adding system messages to the stream
* checkstyle fixes
---
.../samza/sql/data/SamzaSqlRelMsgMetadata.java | 24 +++++-
.../sql/runner/SamzaSqlApplicationConfig.java | 8 ++
.../samza/sql/translator/ProjectTranslator.java | 1 -
.../samza/sql/translator/QueryTranslator.java | 12 +++
.../samza/sql/translator/ScanTranslator.java | 98 +++++++++++++++-------
.../TranslatorInputMetricsMapFunction.java | 3 +-
.../TranslatorOutputMetricsMapFunction.java | 2 +-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 25 +++++-
8 files changed, 133 insertions(+), 40 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
index 713ecbe..14f2892 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMsgMetadata.java
@@ -20,6 +20,8 @@
package org.apache.samza.sql.data;
import java.io.Serializable;
+import java.time.Instant;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -36,9 +38,16 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
public boolean isNewInputMessage = true;
/**
- *
+ * Indicates whether the SamzaSqlMessage is a system message or not.
*/
- public String operatorBeginProcessingInstant = null;
+ @JsonIgnore
+ private boolean isSystemMessage = false;
+
+ /**
+ * Time at which the join operation started for the message.
+ * If there is no join node in the operator graph, this will be -1.
+ */
+ public long joinStartTimeMs = -1;
/**
@@ -93,7 +102,6 @@ public class SamzaSqlRelMsgMetadata implements Serializable {
public boolean hasArrivalTime() { return arrivalTime != null &&
!arrivalTime.isEmpty(); }
-
@JsonProperty("scanTime")
public String getscanTime() { return scanTime;}
@@ -103,6 +111,16 @@ public class SamzaSqlRelMsgMetadata implements
Serializable {
public boolean hasScanTime() { return scanTime != null &&
!scanTime.isEmpty(); }
+ @JsonIgnore
+ public void setIsSystemMessage(boolean isSystemMessage) {
+ this.isSystemMessage = isSystemMessage;
+ }
+
+ @JsonIgnore
+ public boolean isSystemMessage() {
+ return isSystemMessage;
+ }
+
@Override
public String toString() {
return "[Metadata:{" + eventTime + " " + arrivalTime + " " + scanTime +
"}]";
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 4883dfb..d521681 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -89,6 +89,7 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_METADATA_TOPIC_PREFIX =
"samza.sql.metadataTopicPrefix";
public static final String CFG_GROUPBY_WINDOW_DURATION_MS =
"samza.sql.groupby.window.ms";
+ public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS =
"samza.sql.processSystemEvents";
public static final String SAMZA_SYSTEM_LOG = "log";
@@ -115,6 +116,7 @@ public class SamzaSqlApplicationConfig {
private final String metadataTopicPrefix;
private final long windowDurationMs;
+ private final boolean processSystemEvents;
public SamzaSqlApplicationConfig(Config staticConfig, List<String>
inputSystemStreams,
List<String> outputSystemStreams) {
@@ -165,6 +167,8 @@ public class SamzaSqlApplicationConfig {
metadataTopicPrefix =
staticConfig.get(CFG_METADATA_TOPIC_PREFIX,
DEFAULT_METADATA_TOPIC_PREFIX);
+
+ processSystemEvents =
staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true);
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS,
DEFAULT_GROUPBY_WINDOW_DURATION_MS);
}
@@ -324,4 +328,8 @@ public class SamzaSqlApplicationConfig {
public long getWindowDurationMs() {
return windowDurationMs;
}
+
+ public boolean isProcessSystemEvents() {
+ return processSystemEvents;
+ }
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 3378788..6e6ff45 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -193,5 +193,4 @@ class ProjectTranslator {
context.registerMessageStream(project.getId(), outputStream);
context.registerRelNode(project.getId(), project);
}
-
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index d3c8fa9..ce4737c 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -50,6 +50,7 @@ import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -293,6 +294,17 @@ public class QueryTranslator {
GenericOutputDescriptor<KV<Object, Object>> osd =
sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
OutputStream stm =
outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v ->
appDesc.getOutputStream(osd));
outputStream.sendTo(stm);
+
+ // Process system events only if the output is a stream.
+ if (sqlConfig.isProcessSystemEvents()) {
+ for( MessageStream<SamzaSqlInputMessage> inputStream :
inputMsgStreams.values()) {
+ MessageStream<KV<Object, Object>> systemEventStream =
+ inputStream.filter(message ->
message.getMetadata().isSystemMessage())
+ .map(SamzaSqlInputMessage::getKeyAndMessageKV);
+
+ systemEventStream.sendTo(stm);
+ }
+ }
} else {
Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index e044f6f..5fa04d8 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.translator;
@@ -36,8 +36,8 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.SamzaSqlInputTransformer;
import org.apache.samza.sql.SamzaSqlInputMessage;
+import org.apache.samza.sql.SamzaSqlInputTransformer;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
@@ -48,6 +48,7 @@ import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+
/**
* Translator to translate the TableScans in relational graph to the
corresponding input streams in the StreamGraph
* implementation
@@ -78,7 +79,7 @@ class ScanTranslator {
@Override
public boolean apply(SamzaSqlInputMessage samzaSqlInputMessage) {
- return
!relConverter.isSystemMessage(samzaSqlInputMessage.getKeyAndMessageKV());
+ return !samzaSqlInputMessage.getMetadata().isSystemMessage();
}
}
@@ -147,11 +148,11 @@ class ScanTranslator {
queryInputEvents.inc();
processingTime.update(Duration.between(startProcessing,
endProcessing).toMillis());
}
-
} // ScanMapFunction
- void translate(final TableScan tableScan, final String queryLogicalId, final
String logicalOpId, final TranslatorContext context,
- Map<String, DelegatingSystemDescriptor> systemDescriptors, Map<String,
MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
+ void translate(final TableScan tableScan, final String queryLogicalId, final
String logicalOpId,
+ final TranslatorContext context, Map<String, DelegatingSystemDescriptor>
systemDescriptors,
+ Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams) {
StreamApplicationDescriptor streamAppDesc =
context.getStreamAppDescriptor();
List<String> tableNameParts = tableScan.getTable().getQualifiedName();
String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
@@ -162,9 +163,9 @@ class ScanTranslator {
final String streamId = sqlIOConfig.getStreamId();
final String source = sqlIOConfig.getSource();
- final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent()
&&
- (sqlIOConfig.getTableDescriptor().get() instanceof
RemoteTableDescriptor ||
- sqlIOConfig.getTableDescriptor().get() instanceof
CachingTableDescriptor);
+ final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent()
&& (
+ sqlIOConfig.getTableDescriptor().get() instanceof
RemoteTableDescriptor || sqlIOConfig.getTableDescriptor()
+ .get() instanceof CachingTableDescriptor);
// For remote table, we don't have an input stream descriptor. The table
descriptor is already defined by the
// SqlIOResolverFactory.
@@ -181,22 +182,55 @@ class ScanTranslator {
systemDescriptors.put(systemName, systemDescriptor);
} else {
/* in SamzaSQL, there should be no systemDescriptor setup by user, so
this branch happens only
- * in case of Fan-OUT (i.e., same input stream used in multiple sql
statements), or when same input
- * used twice in same sql statement (e.g., select ... from input as i1,
input as i2 ...), o.w., throw error */
+ * in case of Fan-OUT (i.e., same input stream used in multiple sql
statements), or when same input
+ * used twice in same sql statement (e.g., select ... from input as i1,
input as i2 ...), o.w., throw error */
if (systemDescriptor.getTransformer().isPresent()) {
InputTransformer existingTransformer =
systemDescriptor.getTransformer().get();
if (!(existingTransformer instanceof SamzaSqlInputTransformer)) {
- throw new SamzaException("SamzaSQL Exception: existing transformer
for " + systemName + " is not SamzaSqlInputTransformer");
+ throw new SamzaException(
+ "SamzaSQL Exception: existing transformer for " + systemName + "
is not SamzaSqlInputTransformer");
}
}
}
InputDescriptor inputDescriptor =
systemDescriptor.getInputDescriptor(streamId, new NoOpSerde<>());
- MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
- inputMsgStreams.computeIfAbsent(source, v ->
streamAppDesc.getInputStream(inputDescriptor))
- .filter(new FilterSystemMessageFunction(sourceName, queryId))
- .map(new ScanMapFunction(sourceName, queryId, queryLogicalId,
logicalOpId));
+
+ if (!inputMsgStreams.containsKey(source)) {
+ MessageStream<SamzaSqlInputMessage> inputMsgStream =
streamAppDesc.getInputStream(inputDescriptor);
+ inputMsgStreams.put(source, inputMsgStream.map(new
SystemMessageMapperFunction(source, queryId)));
+ }
+ MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
inputMsgStreams.get(source)
+ .filter(new FilterSystemMessageFunction(sourceName, queryId))
+ .map(new ScanMapFunction(sourceName, queryId, queryLogicalId,
logicalOpId));
context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
}
+
+ /**
+ * Function that populates whether the message is a system message.
+ * TODO This should ideally be populated by the InputTransformer in future.
+ */
+ private static class SystemMessageMapperFunction implements
MapFunction<SamzaSqlInputMessage, SamzaSqlInputMessage> {
+ private final String source;
+ private final int queryId;
+ private transient SamzaRelConverter relConverter;
+
+ public SystemMessageMapperFunction(String source, int queryId) {
+ this.source = source;
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void init(Context context) {
+ TranslatorContext translatorContext =
+ ((SamzaSqlApplicationContext)
context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
+ relConverter = translatorContext.getMsgConverter(source);
+ }
+
+ @Override
+ public SamzaSqlInputMessage apply(SamzaSqlInputMessage message) {
+
message.getMetadata().setIsSystemMessage(relConverter.isSystemMessage(message.getKeyAndMessageKV()));
+ return message;
+ }
+ }
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
index bb3300a..ef3028e 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorInputMetricsMapFunction.java
@@ -19,7 +19,6 @@
package org.apache.samza.sql.translator;
-import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
@@ -60,7 +59,7 @@ class TranslatorInputMetricsMapFunction implements
MapFunction<SamzaSqlRelMessag
@Override
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
inputEvents.inc();
- message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant =
Instant.now().toString();
+ message.getSamzaSqlRelMsgMetadata().joinStartTimeMs =
Instant.now().toEpochMilli();
return message;
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
index f1757fb..3e85bed 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorOutputMetricsMapFunction.java
@@ -63,7 +63,7 @@ class TranslatorOutputMetricsMapFunction implements
MapFunction<SamzaSqlRelMessa
@Override
public SamzaSqlRelMessage apply(SamzaSqlRelMessage message) {
Instant endProcessing = Instant.now();
- Instant beginProcessing =
Instant.parse(message.getSamzaSqlRelMsgMetadata().operatorBeginProcessingInstant);
+ Instant beginProcessing =
Instant.ofEpochMilli(message.getSamzaSqlRelMsgMetadata().joinStartTimeMs);
outputEvents.inc();
processingTime.update(Duration.between(beginProcessing,
endProcessing).toMillis());
return message;
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index e69ae9a..e0264ee 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -107,6 +107,29 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
.map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
.sorted()
.collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ }
+
+ @Test
+ public void testEndToEndDisableSystemMessages() {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String avroSamzaToRelMsgConverterDomain =
+
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN,
"avro");
+ staticConfigs.put(avroSamzaToRelMsgConverterDomain +
SamzaSqlApplicationConfig.CFG_FACTORY,
+ SampleRelConverterFactory.class.getName());
+ String sql = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_PROCESS_SYSTEM_EVENTS,
"false");
+ runApplication(new MapConfig(staticConfigs));
+
+ List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
+ .sorted()
+ .collect(Collectors.toList());
Assert.assertEquals((numMessages + 1) / 2, outMessages.size());
}
@@ -174,7 +197,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
Assert.assertEquals(numMessages, outMessagesSet.size());
Assert.assertTrue(IntStream.range(0,
numMessages).boxed().collect(Collectors.toList()).equals(new
ArrayList<>(outMessagesSet)));
}
-
+
@Test
public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
int numMessages = 20;