This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new cb410d2  support for deleting rows from the table (#979)
cb410d2 is described below

commit cb410d2e14de85064e74cedacc1472f15f7d968c
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Thu Mar 28 16:43:13 2019 -0700

    support for deleting rows from the table (#979)
---
 .../org/apache/samza/sql/SamzaSqlRelRecord.java    |  4 ++
 .../org/apache/samza/sql/schema/SqlSchema.java     |  4 ++
 .../apache/samza/sql/data/SamzaSqlRelMessage.java  |  4 ++
 .../org/apache/samza/sql/planner/QueryPlanner.java | 24 +++++--
 .../samza/sql/planner/RelSchemaConverter.java      |  2 +-
 .../samza/sql/translator/QueryTranslator.java      | 80 ++++++++++++----------
 .../test/samzasql/TestSamzaSqlRemoteTable.java     | 13 ++--
 7 files changed, 81 insertions(+), 50 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java 
b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
index d156470..1dee734 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
@@ -120,4 +120,8 @@ public class SamzaSqlRelRecord implements Serializable {
     String valueStr = Joiner.on(",").useForNull("null").join(fieldValues);
     return "[Names:{" + nameStr + "} Values:{" + valueStr + "}]";
   }
+
+  public boolean containsField(String name) {
+    return fieldNames.indexOf(name) != -1;
+  }
 }
diff --git a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java 
b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
index c542379..2d85281 100644
--- a/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
+++ b/samza-api/src/main/java/org/apache/samza/sql/schema/SqlSchema.java
@@ -84,6 +84,10 @@ public class SqlSchema {
         .collect(Collectors.toList());
   }
 
+  public boolean containsField(String keyName) {
+    return fields.stream().anyMatch(x -> x.getFieldName().equals(keyName));
+  }
+
   public List<SqlField> getFields() {
     return fields;
   }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
index 0ffc845..d538a8e 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
@@ -38,6 +38,10 @@ public class SamzaSqlRelMessage implements Serializable {
 
   public static final String KEY_NAME = "__key__";
 
+  public static final String OP_NAME = "__op__";
+
+  public static final String DELETE_OP = "DELETE";
+
   // key could be a record in itself.
   private final Object key;
 
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 8bccc2e..7114098 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -55,6 +55,8 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.schema.SamzaSqlFieldType;
+import org.apache.samza.sql.schema.SqlFieldSchema;
 import org.apache.samza.sql.schema.SqlSchema;
 import org.apache.samza.sql.interfaces.UdfMetadata;
 import org.slf4j.Logger;
@@ -105,7 +107,21 @@ public class QueryPlanner {
           } else {
             // If the source part is the last one, then fetch the schema 
corresponding to the stream and register.
             SqlSchema sqlSchema = relSchemaProvider.getSqlSchema();
-            RelDataType relationalSchema = 
relSchemaConverter.convertToRelSchema(sqlSchema);
+
+            List<String> fieldNames = new ArrayList<>();
+            List<SqlFieldSchema> fieldTypes = new ArrayList<>();
+            if (!sqlSchema.containsField(SamzaSqlRelMessage.KEY_NAME)) {
+              fieldNames.add(SamzaSqlRelMessage.KEY_NAME);
+              
fieldTypes.add(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.ANY));
+            }
+
+            fieldNames.addAll(
+                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldName).collect(Collectors.toList()));
+            fieldTypes.addAll(
+                
sqlSchema.getFields().stream().map(SqlSchema.SqlField::getFieldSchema).collect(Collectors.toList()));
+
+            SqlSchema newSchema = new SqlSchema(fieldNames, fieldTypes);
+            RelDataType relationalSchema = 
relSchemaConverter.convertToRelSchema(newSchema);
             previousLevelSchema.add(sourcePart, 
createTableFromRelSchema(relationalSchema));
             break;
           }
@@ -149,11 +165,7 @@ public class QueryPlanner {
   private Table createTableFromRelSchema(RelDataType relationalSchema) {
     return new AbstractTable() {
       public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-        List<RelDataTypeField> fieldsList = new ArrayList<>();
-        fieldsList.add(new RelDataTypeFieldImpl(SamzaSqlRelMessage.KEY_NAME, 0,
-            
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY),
 true)));
-        fieldsList.addAll(relationalSchema.getFieldList());
-        return new RelRecordType(fieldsList);
+        return relationalSchema;
       }
     };
   }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
index 1f139c1..6634f5a 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/RelSchemaConverter.java
@@ -61,7 +61,7 @@ public class RelSchemaConverter extends SqlTypeFactoryImpl {
 
     for (SqlSchema.SqlField field : fields) {
       String fieldName = field.getFieldName();
-      int fieldPos = field.getPosition() + 1;
+      int fieldPos = field.getPosition();
       RelDataType dataType = getRelDataType(field.getFieldSchema());
       relFields.add(new RelDataTypeFieldImpl(fieldName, fieldPos, dataType));
     }
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index ce4737c..2460b1f 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -1,21 +1,21 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.samza.sql.translator;
 
@@ -50,7 +50,6 @@ import org.apache.samza.metrics.SamzaHistogram;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -127,11 +126,15 @@ public class QueryTranslator {
       ContainerContext containerContext = context.getContainerContext();
       metricsRegistry = containerContext.getContainerMetricsRegistry();
       /* insert (SendToOutputStream) metrics */
-      insertProcessingTime = new SamzaHistogram(metricsRegistry, 
insertLogicalId, TranslatorConstants.TOTAL_LATENCY_NAME);;
+      insertProcessingTime =
+          new SamzaHistogram(metricsRegistry, insertLogicalId, 
TranslatorConstants.TOTAL_LATENCY_NAME);
+
       /* query metrics */
-      totalLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, 
TranslatorConstants.TOTAL_LATENCY_NAME);;
+      totalLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, 
TranslatorConstants.TOTAL_LATENCY_NAME);
+
       queryLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, 
TranslatorConstants.QUERY_LATENCY_NAME);
-      queueingLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, 
TranslatorConstants.QUEUEING_LATENCY_NAME);;
+      queueingLatency = new SamzaHistogram(metricsRegistry, queryLogicalId, 
TranslatorConstants.QUEUEING_LATENCY_NAME);
+      
       queryOutputEvents = metricsRegistry.newCounter(queryLogicalId, 
TranslatorConstants.OUTPUT_EVENTS_NAME);
       queryOutputEvents.clear();
     }
@@ -140,8 +143,14 @@ public class QueryTranslator {
     public KV<Object, Object> apply(SamzaSqlRelMessage message) {
       Instant beginProcessing = Instant.now();
       KV<Object, Object> retKV = 
this.samzaMsgConverter.convertToSamzaMessage(message);
+      if 
(message.getSamzaSqlRelRecord().containsField(SamzaSqlRelMessage.OP_NAME)
+          && ((String) 
message.getSamzaSqlRelRecord().getField(SamzaSqlRelMessage.OP_NAME).get()).equalsIgnoreCase(
+          SamzaSqlRelMessage.DELETE_OP)) {
+        // If it is a delete op. Set the payload to null so that the record 
gets deleted.
+        retKV = new KV<>(retKV.key, null);
+      }
       updateMetrics(beginProcessing, Instant.now(), 
message.getSamzaSqlRelMsgMetadata());
-      return  retKV;
+      return retKV;
     }
 
     /**
@@ -161,7 +170,7 @@ public class QueryTranslator {
       Instant scanTime = Instant.parse(metadata.getscanTime());
       queryLatency.update(Duration.between(scanTime, outputTime).toMillis());
       /** TODO: change if hasArrivalTime to validation once arrivalTime is 
assigned,
-                and later remove the check once code is stable */
+       and later remove the check once code is stable */
       if (metadata.hasArrivalTime()) {
         Instant arrivalTime = Instant.parse(metadata.getarrivalTime());
         queueingLatency.update(Duration.between(arrivalTime, 
scanTime).toMillis());
@@ -171,7 +180,6 @@ public class QueryTranslator {
         Instant eventTime = Instant.parse(metadata.getEventTime());
         totalLatency.update(Duration.between(eventTime, 
outputTime).toMillis());
       }
-
     }
   } // OutputMapFunction
 
@@ -238,7 +246,8 @@ public class QueryTranslator {
       public RelNode visit(TableScan scan) {
         RelNode node = super.visit(scan);
         String logicalOpId = 
String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "scan", opId++);
-        scanTranslator.translate(scan, queryLogicalId, logicalOpId, 
translatorContext, systemDescriptors, inputMsgStreams);
+        scanTranslator.translate(scan, queryLogicalId, logicalOpId, 
translatorContext, systemDescriptors,
+            inputMsgStreams);
         return node;
       }
 
@@ -262,8 +271,7 @@ public class QueryTranslator {
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
         String logicalOpId = 
String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "join", opId++);
-        new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(), 
queryId)
-            .translate(join, translatorContext);
+        new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(), 
queryId).translate(join, translatorContext);
         return node;
       }
 
@@ -271,33 +279,35 @@ public class QueryTranslator {
       public RelNode visit(LogicalAggregate aggregate) {
         RelNode node = super.visit(aggregate);
         String logicalOpId = 
String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "window", opId++);
-        new LogicalAggregateTranslator(logicalOpId, 
sqlConfig.getMetadataTopicPrefix())
-            .translate(aggregate, translatorContext);
+        new LogicalAggregateTranslator(logicalOpId, 
sqlConfig.getMetadataTopicPrefix()).translate(aggregate,
+            translatorContext);
         return node;
       }
     });
 
     String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, 
queryId, "insert", opId);
-    sendToOutputStream(queryLogicalId, logicalOpId, outputSystemStream, 
streamAppDescriptor, translatorContext, node, queryId);
+    sendToOutputStream(queryLogicalId, logicalOpId, outputSystemStream, 
streamAppDescriptor, translatorContext, node,
+        queryId);
   }
 
-  private void sendToOutputStream(String queryLogicalId, String logicalOpId, 
String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext 
translatorContext, RelNode node, int queryId) {
+  private void sendToOutputStream(String queryLogicalId, String logicalOpId, 
String sinkStream,
+      StreamApplicationDescriptor appDesc, TranslatorContext 
translatorContext, RelNode node, int queryId) {
     SqlIOConfig sinkConfig = 
sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
     MessageStream<SamzaSqlRelMessage> stream = 
translatorContext.getMessageStream(node.getId());
-    MessageStream<KV<Object, Object>> outputStream = stream.map(new 
OutputMapFunction(queryLogicalId, logicalOpId, sinkStream, queryId));
+    MessageStream<KV<Object, Object>> outputStream =
+        stream.map(new OutputMapFunction(queryLogicalId, logicalOpId, 
sinkStream, queryId));
     Optional<TableDescriptor> tableDescriptor = 
sinkConfig.getTableDescriptor();
     if (!tableDescriptor.isPresent()) {
       KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
       String systemName = sinkConfig.getSystemName();
-      DelegatingSystemDescriptor
-          sd = systemDescriptors.computeIfAbsent(systemName, 
DelegatingSystemDescriptor::new);
+      DelegatingSystemDescriptor sd = 
systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
       GenericOutputDescriptor<KV<Object, Object>> osd = 
sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
       OutputStream stm = 
outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> 
appDesc.getOutputStream(osd));
       outputStream.sendTo(stm);
 
       // Process system events only if the output is a stream.
       if (sqlConfig.isProcessSystemEvents()) {
-        for( MessageStream<SamzaSqlInputMessage> inputStream :  
inputMsgStreams.values()) {
+        for (MessageStream<SamzaSqlInputMessage> inputStream : 
inputMsgStreams.values()) {
           MessageStream<KV<Object, Object>> systemEventStream =
               inputStream.filter(message -> 
message.getMetadata().isSystemMessage())
                   .map(SamzaSqlInputMessage::getKeyAndMessageKV);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index 6f94d9f..d962b14 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -32,7 +32,6 @@ import org.apache.samza.sql.util.JsonUtil;
 import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.apache.samza.sql.util.RemoteStoreIOResolverTestFactory;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 
@@ -53,8 +52,6 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(numMessages, 
RemoteStoreIOResolverTestFactory.records.size());
   }
 
-  // SAMZA-2110 We need to enable this when we have a true support for Null 
records
-  @Ignore
   @Test
   public void testSinkEndToEndWithKeyWithNullRecords() {
     int numMessages = 20;
@@ -65,8 +62,10 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
     Map<String, String> staticConfigs =
         SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages, 
false, true);
 
-    String sql = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
-    List<String> sqlStmts = Arrays.asList(sql);
+    String sql1 = "Insert into testRemoteStore.testTable.`$table` select 
__key__, id, name from testavro.SIMPLE1";
+    String sql2 = "Insert into testRemoteStore.testTable.`$table` select 
__key__, 'DELETE' as __op__ from testavro.SIMPLE1 WHERE name IS NULL";
+
+    List<String> sqlStmts = Arrays.asList(sql1, sql2);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     runApplication(new MapConfig(staticConfigs));
 
@@ -215,8 +214,6 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(expectedOutMessages, outMessages);
   }
 
-  // SAMZA-2110 We need to enable this when we have a true support for null 
records.
-  @Ignore
   @Test
   public void testSameJoinTargetSinkEndToEndRightOuterJoin() {
     int numMessages = 21;
@@ -233,7 +230,7 @@ public class TestSamzaSqlRemoteTable extends 
SamzaSqlIntegrationTestHarness {
     // redundant here, keeping it just for testing purpose.
     String sql =
         "Insert into testRemoteStore.Profile.`$table` "
-            + "select p.__key__ as __key__ "
+            + "select p.__key__ as __key__, 'DELETE' as __op__ "
             + "from testRemoteStore.Profile.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
             + " on p.__key__ = pv.profileId ";

Reply via email to