This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new cb410d2 support for deleting rows from the table (#979)
cb410d2 is described below
commit cb410d2e14de85064e74cedacc1472f15f7d968c
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Thu Mar 28 16:43:13 2019 -0700
support for deleting rows from the table (#979)
---
.../org/apache/samza/sql/SamzaSqlRelRecord.java | 4 ++
.../org/apache/samza/sql/schema/SqlSchema.java | 4 ++
.../apache/samza/sql/data/SamzaSqlRelMessage.java | 4 ++
.../org/apache/samza/sql/planner/QueryPlanner.java | 24 +++++--
.../samza/sql/planner/RelSchemaConverter.java | 2 +-
.../samza/sql/translator/QueryTranslator.java | 80 ++++++++++++----------
.../test/samzasql/TestSamzaSqlRemoteTable.java | 13 ++--
7 files changed, 81 insertions(+), 50 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
index d156470..1dee734 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
@@ -120,4 +120,8 @@ public class SamzaSqlRelRecord implements Serializable {
String valueStr = Joiner.on(",").useForNull("null").join(fieldValues);
return "[Names:{" + nameStr + "} Values:{" + valueStr + "}]";
}
+
+ public boolean containsField(String name) {
+ return fieldNames.indexOf(name) != -1;
+ }
}
diff --git a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
index c542379..2d85281 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
@@ -84,6 +84,10 @@ public class SqlSchema {
.collect(Collectors.toList());
}
+ public boolean containsField(String keyName) {
+ return fields.stream().anyMatch(x -> x.getFieldName().equals(keyName));
+ }
+
public List<SqlField> getFields() {
return fields;
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 0ffc845..d538a8e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -38,6 +38,10 @@ public class SamzaSqlRelMessage implements Serializable {
public static final String KEY_NAME = "__key__";
+ public static final String OP_NAME = "__op__";
+
+ public static final String DELETE_OP = "DELETE";
+
// key could be a record in itself.
private final Object key;
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 8bccc2e..7114098 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -55,6 +55,8 @@ import org.apache.samza.SamzaException;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.schema.SamzaSqlFieldType;
+import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.sql.interfaces.UdfMetadata;
import org.slf4j.Logger;
@@ -105,7 +107,21 @@ public class QueryPlanner {
} else {
// If the source part is the last one, then fetch the schema
corresponding to the stream and register.
SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
- RelDataType relationalSchema =
relSchemaConverter.convertToRelSchema(sqlSchema);
+
+ List<String> fieldNames = new ArrayList<>();
+ List<SqlFieldSchema> fieldTypes = new ArrayList<>();
+ if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
+ fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
+
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
+ }
+
+ fieldNames.addAll(
+
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
+ fieldTypes.addAll(
+
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
+
+ SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
+ RelDataType relationalSchema =
relSchemaConverter.convertToRelSchema(newSchema);
previousLevelSchema.add(sourcePart,
createTableFromRelSchema(relationalSchema));
break;
}
@@ -149,11 +165,7 @@ public class QueryPlanner {
private Table createTableFromRelSchema(RelDataType relationalSchema) {
return new AbstractTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- List<RelDataTypeField> fieldsList = new ArrayList<>();
- fieldsList.add(new RelDataTypeFieldImpl(SamzaSqlRelMessage.KEY_NAME, 0,
-
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY),
true)));
- fieldsList.addAll(relationalSchema.getFieldList());
- return new RelRecordType(fieldsList);
+ return relationalSchema;
}
};
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index 1f139c1..6634f5a 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -61,7 +61,7 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
for (SqlSchema.SqlField field : fields) {
String fieldName = field.getFieldName();
- int fieldPos = field.getPosition() + 1;
+ int fieldPos = field.getPosition();
RelDataType dataType = getRelDataType(field.getFieldSchema());
relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
}
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index ce4737c..2460b1f 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -1,21 +1,21 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.samza.sql.translator;
@@ -50,7 +50,6 @@ import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -127,11 +126,15 @@ public class QueryTranslator {
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
/* insert (SendToOutputStream) metrics */
- insertProcessingTime = new SamzaHistogram(metricsRegistry,
insertLogicalId, TranslatorConstants.TOTAL_LATENCY_NAME);;
+ insertProcessingTime =
+ new SamzaHistogram(metricsRegistry, insertLogicalId,
TranslatorConstants.TOTAL_LATENCY_NAME);
+
/* query metrics */
- totalLatency = new SamzaHistogram(metricsRegistry, queryLogicalId,
TranslatorConstants.TOTAL_LATENCY_NAME);;
+ totalLatency = new SamzaHistogram(metricsRegistry, queryLogicalId,
TranslatorConstants.TOTAL_LATENCY_NAME);
+
queryLatency = new SamzaHistogram(metricsRegistry, queryLogicalId,
TranslatorConstants.QUERY_LATENCY_NAME);
- queueingLatency = new SamzaHistogram(metricsRegistry, queryLogicalId,
TranslatorConstants.QUEUEING_LATENCY_NAME);;
+ queueingLatency = new SamzaHistogram(metricsRegistry, queryLogicalId,
TranslatorConstants.QUEUEING_LATENCY_NAME);
+
queryOutputEvents = metricsRegistry.newCounter(queryLogicalId,
TranslatorConstants.OUTPUT_EVENTS_NAME);
queryOutputEvents.clear();
}
@@ -140,8 +143,14 @@ public class QueryTranslator {
public KV<Object, Object> apply(SamzaSqlRelMessage message) {
Instant beginProcessing = Instant.now();
KV<Object, Object> retKV =
this.samzaMsgConverter.convertToSamzaMessage(message);
+ if
(message.getSamzaSqlRelRecord().containsField(SamzaSqlRelMessage.OP_NAME)
+ && ((String)
message.getSamzaSqlRelRecord().getField(SamzaSqlRelMessage.OP_NAME).get()).equalsIgnoreCase(
+ SamzaSqlRelMessage.DELETE_OP)) {
+ // If it is a delete op. Set the payload to null so that the record
gets deleted.
+ retKV = new KV<>(retKV.key, null);
+ }
updateMetrics(beginProcessing, Instant.now(),
message.getSamzaSqlRelMsgMetadata());
- return retKV;
+ return retKV;
}
/**
@@ -161,7 +170,7 @@ public class QueryTranslator {
Instant scanTime = Instant.parse(metadata.getscanTime());
queryLatency.update(Duration.between(scanTime, outputTime).toMillis());
/** TODO: change if hasArrivalTime to validation once arrivalTime is
assigned,
- and later remove the check once code is stable */
+ and later remove the check once code is stable */
if (metadata.hasArrivalTime()) {
Instant arrivalTime = Instant.parse(metadata.getarrivalTime());
queueingLatency.update(Duration.between(arrivalTime,
scanTime).toMillis());
@@ -171,7 +180,6 @@ public class QueryTranslator {
Instant eventTime = Instant.parse(metadata.getEventTime());
totalLatency.update(Duration.between(eventTime,
outputTime).toMillis());
}
-
}
} // OutputMapFunction
@@ -238,7 +246,8 @@ public class QueryTranslator {
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
String logicalOpId =
String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "scan", opId++);
- scanTranslator.translate(scan, queryLogicalId, logicalOpId,
translatorContext, systemDescriptors, inputMsgStreams);
+ scanTranslator.translate(scan, queryLogicalId, logicalOpId,
translatorContext, systemDescriptors,
+ inputMsgStreams);
return node;
}
@@ -262,8 +271,7 @@ public class QueryTranslator {
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
String logicalOpId =
String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "join", opId++);
- new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(),
queryId)
- .translate(join, translatorContext);
+ new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(),
queryId).translate(join, translatorContext);
return node;
}
@@ -271,33 +279,35 @@ public class QueryTranslator {
public RelNode visit(LogicalAggregate aggregate) {
RelNode node = super.visit(aggregate);
String logicalOpId =
String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "window", opId++);
- new LogicalAggregateTranslator(logicalOpId,
sqlConfig.getMetadataTopicPrefix())
- .translate(aggregate, translatorContext);
+ new LogicalAggregateTranslator(logicalOpId,
sqlConfig.getMetadataTopicPrefix()).translate(aggregate,
+ translatorContext);
return node;
}
});
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE,
queryId, "insert", opId);
- sendToOutputStream(queryLogicalId, logicalOpId, outputSystemStream,
streamAppDescriptor, translatorContext, node, queryId);
+ sendToOutputStream(queryLogicalId, logicalOpId, outputSystemStream,
streamAppDescriptor, translatorContext, node,
+ queryId);
}
- private void sendToOutputStream(String queryLogicalId, String logicalOpId,
String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext
translatorContext, RelNode node, int queryId) {
+ private void sendToOutputStream(String queryLogicalId, String logicalOpId,
String sinkStream,
+ StreamApplicationDescriptor appDesc, TranslatorContext
translatorContext, RelNode node, int queryId) {
SqlIOConfig sinkConfig =
sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
MessageStream<SamzaSqlRelMessage> stream =
translatorContext.getMessageStream(node.getId());
- MessageStream<KV<Object, Object>> outputStream = stream.map(new
OutputMapFunction(queryLogicalId, logicalOpId, sinkStream, queryId));
+ MessageStream<KV<Object, Object>> outputStream =
+ stream.map(new OutputMapFunction(queryLogicalId, logicalOpId,
sinkStream, queryId));
Optional<TableDescriptor> tableDescriptor =
sinkConfig.getTableDescriptor();
if (!tableDescriptor.isPresent()) {
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new
NoOpSerde<>());
String systemName = sinkConfig.getSystemName();
- DelegatingSystemDescriptor
- sd = systemDescriptors.computeIfAbsent(systemName,
DelegatingSystemDescriptor::new);
+ DelegatingSystemDescriptor sd =
systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericOutputDescriptor<KV<Object, Object>> osd =
sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
OutputStream stm =
outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v ->
appDesc.getOutputStream(osd));
outputStream.sendTo(stm);
// Process system events only if the output is a stream.
if (sqlConfig.isProcessSystemEvents()) {
- for( MessageStream<SamzaSqlInputMessage> inputStream :
inputMsgStreams.values()) {
+ for (MessageStream<SamzaSqlInputMessage> inputStream :
inputMsgStreams.values()) {
MessageStream<KV<Object, Object>> systemEventStream =
inputStream.filter(message ->
message.getMetadata().isSystemMessage())
.map(SamzaSqlInputMessage::getKeyAndMessageKV);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index 6f94d9f..d962b14 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -32,7 +32,6 @@ import org.apache.samza.sql.util.JsonUtil;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
@@ -53,8 +52,6 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
Assert.assertEquals(numMessages,
RemoteStoreIOResolverTestFactory.records.size());
}
- // SAMZA-2110 We need to enable this when we have a true support for Null
records
- @Ignore
@Test
public void testSinkEndToEndWithKeyWithNullRecords() {
int numMessages = 20;
@@ -65,8 +62,10 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages,
false, true);
- String sql = "Insert into testRemoteStore.testTable.`$table` select
__key__, id, name from testavro.SIMPLE1";
- List<String> sqlStmts = Arrays.asList(sql);
+ String sql1 = "Insert into testRemoteStore.testTable.`$table` select
__key__, id, name from testavro.SIMPLE1";
+ String sql2 = "Insert into testRemoteStore.testTable.`$table` select
__key__, 'DELETE' as __op__ from testavro.SIMPLE1 WHERE name IS NULL";
+
+ List<String> sqlStmts = Arrays.asList(sql1, sql2);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));
@@ -215,8 +214,6 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
Assert.assertEquals(expectedOutMessages, outMessages);
}
- // SAMZA-2110 We need to enable this when we have a true support for null
records.
- @Ignore
@Test
public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
int numMessages = 21;
@@ -233,7 +230,7 @@ public class TestSamzaSqlRemoteTable extends
SamzaSqlIntegrationTestHarness {
// redundant here, keeping it just for testing purpose.
String sql =
"Insert into testRemoteStore.Profile.`$table` "
- + "select p.__key__ as __key__ "
+ + "select p.__key__ as __key__, 'DELETE' as __op__ "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
+ " on p.__key__ = pv.profileId ";