Repository: samza Updated Branches: refs/heads/master 64c82634c -> dcd4b558a
SAMZA-1968: Samza-sql - Change Calcite sql type for samza sql rel message __key__ to accept any format Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #774 from atoomula/keyformat Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dcd4b558 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dcd4b558 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dcd4b558 Branch: refs/heads/master Commit: dcd4b558a2c702f5b5a320fdb9d0c3fcadabd09b Parents: 64c8263 Author: Aditya Toomula <[email protected]> Authored: Tue Oct 30 11:25:00 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Tue Oct 30 11:25:00 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/sql/SamzaSqlRelRecord.java | 13 +- .../sql/client/impl/SamzaExecutorTest.java | 2 +- .../apache/samza/sql/avro/AvroRelConverter.java | 59 ++++--- .../samza/sql/avro/AvroRelSchemaProvider.java | 5 + .../samza/sql/data/SamzaSqlCompositeKey.java | 82 --------- .../samza/sql/data/SamzaSqlRelMessage.java | 52 +++++- .../apache/samza/sql/fn/ConvertToStringUdf.java | 39 +++++ .../org/apache/samza/sql/fn/GetSqlFieldUdf.java | 95 +++++++++++ .../sql/impl/ConfigBasedIOResolverFactory.java | 14 +- .../apache/samza/sql/planner/QueryPlanner.java | 2 +- .../samza/sql/translator/JoinTranslator.java | 33 ++-- .../SamzaSqlRelMessageJoinFunction.java | 23 ++- .../samza/sql/avro/TestAvroRelConversion.java | 10 +- .../samza/sql/data/TestSamzaSqlRelMessage.java | 30 ++++ .../samza/sql/fn/TestConvertToStringUdf.java | 62 +++++++ .../apache/samza/sql/fn/TestGetSqlFieldUdf.java | 166 +++++++++++++++++++ .../samza/sql/system/TestAvroSystemFactory.java | 6 +- .../samza/sql/testutil/SamzaSqlTestConfig.java | 1 - .../sql/testutil/TestIOResolverFactory.java | 13 +- .../TestSamzaSqlRelMessageJoinFunction.java | 21 ++- .../tools/avro/AvroSchemaGenRelConverter.java | 12 +- 21 files changed, 570 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java index e17a273..a877e6b 100644 --- a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java +++ b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java @@ -19,6 +19,7 @@ package org.apache.samza.sql; +import com.google.common.base.Joiner; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -40,6 +41,7 @@ public class SamzaSqlRelRecord implements Serializable { private final ArrayList<String> fieldNames; @JsonProperty("fieldValues") private final ArrayList<Object> fieldValues; + private final int hashCode; /** * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values. @@ -59,6 +61,8 @@ public class SamzaSqlRelRecord implements Serializable { this.fieldNames.addAll(fieldNames); this.fieldValues.addAll(fieldValues); + + hashCode = Objects.hash(fieldNames, fieldValues); } /** @@ -96,7 +100,7 @@ public class SamzaSqlRelRecord implements Serializable { @Override public int hashCode() { - return Objects.hash(fieldNames, fieldValues); + return hashCode; } @Override @@ -110,4 +114,11 @@ public class SamzaSqlRelRecord implements Serializable { SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj; return Objects.equals(fieldNames, other.fieldNames) && Objects.equals(fieldValues, other.fieldValues); } + + @Override + public String toString() { + String nameStr = Joiner.on(",").join(fieldNames); + String valueStr = Joiner.on(",").useForNull("null").join(fieldValues); + return "[Names:{" + nameStr + "} Values:{" + valueStr + "}]"; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java ---------------------------------------------------------------------- diff --git a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java index 18fe4b7..91ec7f6 100644 --- a/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java +++ b/samza-sql-shell/src/test/java/org/apache/samza/sql/client/impl/SamzaExecutorTest.java @@ -61,7 +61,7 @@ public class SamzaExecutorTest { Assert.assertEquals("NewCompany", ts.getFieldName(2)); Assert.assertEquals("OldCompany", ts.getFieldName(3)); Assert.assertEquals("ProfileChangeTimestamp", ts.getFieldName(4)); - Assert.assertEquals("VARCHAR", ts.getFieldTypeName(0)); + Assert.assertEquals("ANY", ts.getFieldTypeName(0)); Assert.assertEquals("VARCHAR", ts.getFieldTypeName(1)); Assert.assertEquals("VARCHAR", ts.getFieldTypeName(2)); Assert.assertEquals("VARCHAR", ts.getFieldTypeName(3)); http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java index 7d97466..89026ee 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java @@ -61,13 +61,13 @@ import org.slf4j.LoggerFactory; public class AvroRelConverter implements SamzaRelConverter { protected final Config config; - private final Schema avroSchema; + private final Schema payloadSchema; private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class); public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) { this.config = config; - this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream)); + this.payloadSchema = Schema.parse(schemaProvider.getSchema(systemStream)); } /** @@ -76,45 +76,50 @@ public class AvroRelConverter implements SamzaRelConverter { */ @Override public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) { - List<Object> fieldValues = new ArrayList<>(); - List<String> fieldNames = new ArrayList<>(); + List<String> payloadFieldNames = new ArrayList<>(); + List<Object> payloadFieldValues = new ArrayList<>(); Object value = samzaMessage.getValue(); if (value instanceof IndexedRecord) { - IndexedRecord record = (IndexedRecord) value; - // Please note that record schema and cached schema could be different due to schema evolution. - // Always represent record schema in the form of cached schema. This approach has the side-effect - // of dropping the newly added fields in the scenarios where the record schema has newer version - // than the cached schema. [TODO: SAMZA-1679] - Schema recordSchema = record.getSchema(); - fieldNames.addAll(avroSchema.getFields().stream() - .map(Schema.Field::name) - .collect(Collectors.toList())); - fieldValues.addAll(fieldNames.stream() - .map(f -> convertToJavaObject( - recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null, - getNonNullUnionSchema(avroSchema.getField(f).schema()))) - .collect(Collectors.toList())); + fetchFieldNamesAndValuesFromIndexedRecord((IndexedRecord) value, payloadFieldNames, payloadFieldValues, + payloadSchema); } else if (value == null) { - fieldNames.addAll(avroSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())); - IntStream.range(0, fieldNames.size()).forEach(x -> fieldValues.add(null)); + payloadFieldNames.addAll(payloadSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList())); + IntStream.range(0, payloadFieldNames.size()).forEach(x -> payloadFieldValues.add(null)); } else { String msg = "Avro message converter doesn't support messages of type " + value.getClass(); LOG.error(msg); throw new SamzaException(msg); } - return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, fieldValues); + return new SamzaSqlRelMessage(samzaMessage.getKey(), payloadFieldNames, payloadFieldValues); + } + + public void fetchFieldNamesAndValuesFromIndexedRecord(IndexedRecord record, List<String> fieldNames, + List<Object> fieldValues, Schema cachedSchema) { + // Please note that record schema and cached schema could be different due to schema evolution. + // Always represent record schema in the form of cached schema. This approach has the side-effect + // of dropping the newly added fields in the scenarios where the record schema has newer version + // than the cached schema. [TODO: SAMZA-1679] + Schema recordSchema = record.getSchema(); + fieldNames.addAll(cachedSchema.getFields().stream() + .map(Schema.Field::name) + .collect(Collectors.toList())); + fieldValues.addAll(fieldNames.stream() + .map(f -> convertToJavaObject( + recordSchema.getField(f) != null ? record.get(recordSchema.getField(f).pos()) : null, + getNonNullUnionSchema(payloadSchema.getField(f).schema()))) + .collect(Collectors.toList())); } private SamzaSqlRelRecord convertToRelRecord(IndexedRecord avroRecord) { - List<Object> values = new ArrayList<>(); + List<Object> fieldValues = new ArrayList<>(); List<String> fieldNames = new ArrayList<>(); if (avroRecord != null) { fieldNames.addAll(avroRecord.getSchema().getFields() .stream() .map(Schema.Field::name) .collect(Collectors.toList())); - values.addAll(avroRecord.getSchema().getFields() + fieldValues.addAll(avroRecord.getSchema().getFields() .stream() .map(f -> convertToJavaObject(avroRecord.get(avroRecord.getSchema().getField(f.name()).pos()), getNonNullUnionSchema(avroRecord.getSchema().getField(f.name()).schema()))) @@ -125,7 +130,7 @@ public class AvroRelConverter implements SamzaRelConverter { throw new SamzaException(msg); } - return new SamzaSqlRelRecord(fieldNames, values); + return new SamzaSqlRelRecord(fieldNames, fieldValues); } /** @@ -133,11 +138,11 @@ public class AvroRelConverter implements SamzaRelConverter { */ @Override public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) { - return convertToSamzaMessage(relMessage, this.avroSchema); + return convertToSamzaMessage(relMessage, this.payloadSchema); } - protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema avroSchema) { - return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), avroSchema)); + protected KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage, Schema payloadSchema) { + return new KV<>(relMessage.getKey(), convertToGenericRecord(relMessage.getSamzaSqlRelRecord(), payloadSchema)); } private GenericRecord convertToGenericRecord(SamzaSqlRelRecord relRecord, Schema schema) { http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java index fb11624..f37c740 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelSchemaProvider.java @@ -24,5 +24,10 @@ import org.apache.samza.system.SystemStream; public interface AvroRelSchemaProvider extends RelSchemaProvider { + /** + * Get payload schema corresponding to the system stream. + * @param systemStream system stream for which payload schema needs to be obtained. + * @return schema in the form of string + */ String getSchema(SystemStream systemStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java deleted file mode 100644 index 4b4b8f2..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlCompositeKey.java +++ /dev/null @@ -1,82 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql.data; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; - - -/** - * A serializable class that holds different key parts. - */ -public class SamzaSqlCompositeKey implements Serializable { - - @JsonProperty("keyParts") - private ArrayList<Object> keyParts; - private int hashCode; - - @JsonCreator - public SamzaSqlCompositeKey(@JsonProperty("keyParts") List<Object> keyParts) { - this.keyParts = new ArrayList<>(keyParts); - hashCode = keyParts.hashCode(); - } - - /** - * Get the keyParts of all the columns in the relational message. - * @return the keyParts of all the columns - */ - @JsonProperty("keyParts") - public ArrayList<Object> getKeyParts() { - return keyParts; - } - - @Override - public String toString() { - return String.join(", ", Arrays.toString(keyParts.toArray())); - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public boolean equals(Object o) { - return this == o || o != null && getClass() == o.getClass() && keyParts.equals(((SamzaSqlCompositeKey) o).keyParts); - } - - /** - * Create the SamzaSqlCompositeKey from the rel message. - * @param message Represents the samza sql rel message. - * @param relIdx list of keys in the form of field indices within the rel message. - * @return the composite key of the rel message - */ - public static SamzaSqlCompositeKey createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) { - ArrayList<Object> keyParts = new ArrayList<>(); - for (int idx : relIdx) { - keyParts.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx)); - } - return new SamzaSqlCompositeKey(keyParts); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java index 3ebbb23..55ce7b0 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java @@ -31,12 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty; /** * Samza sql relational message. Each Samza sql relational message represents a relational row in a table. * Each row of the relational table consists of a primary key and {@link SamzaSqlRelRecord}, which consists of a list - * of column values and the associated column names. + * of column values and the associated column names. Please note that the primary key itself could be a + * {@link SamzaSqlRelRecord}. */ public class SamzaSqlRelMessage implements Serializable { public static final String KEY_NAME = "__key__"; + // key could be a record in itself. private final Object key; @JsonProperty("samzaSqlRelRecord") @@ -122,4 +124,52 @@ public class SamzaSqlRelMessage implements Serializable { SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj; return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord); } + + @Override + public String toString() { + return "RelMessage: {" + samzaSqlRelRecord + "}"; + } + + /** + * Create composite key from the rel message. + * @param message Represents the samza sql rel message to extract the key values from. + * @param keyValueIdx list of key values in the form of field indices within the rel message. + * @param keyPartNames Represents the key field names. + * @return the composite key of the rel message + */ + public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> keyValueIdx, + List<String> keyPartNames) { + Validate.isTrue(keyValueIdx.size() == keyPartNames.size(), "Key part name and value list sizes are different"); + ArrayList<Object> keyPartValues = new ArrayList<>(); + for (int idx : keyValueIdx) { + keyPartValues.add(message.getSamzaSqlRelRecord().getFieldValues().get(idx)); + } + return new SamzaSqlRelRecord(keyPartNames, keyPartValues); + } + + /** + * Create composite key from the rel message. + * @param message Represents the samza sql rel message to extract the key values and names from. + * @param relIdx list of keys in the form of field indices within the rel message. + * @return the composite key of the rel message + */ + public static SamzaSqlRelRecord createSamzaSqlCompositeKey(SamzaSqlRelMessage message, List<Integer> relIdx) { + return createSamzaSqlCompositeKey(message, relIdx, + getSamzaSqlCompositeKeyFieldNames(message.getSamzaSqlRelRecord().getFieldNames(), relIdx)); + } + + /** + * Get composite key field names. + * @param fieldNames list of field names to extract the key names from. + * @param nameIds indices within the field names. + * @return list of composite key field names + */ + public static List<String> getSamzaSqlCompositeKeyFieldNames(List<String> fieldNames, + List<Integer> nameIds) { + List<String> keyPartNames = new ArrayList<>(); + for (int idx : nameIds) { + keyPartNames.add(fieldNames.get(idx)); + } + return keyPartNames; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java new file mode 100644 index 0000000..e31c2bb --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/ConvertToStringUdf.java @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.fn; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.udfs.ScalarUdf; + + +/** + * UDF that converts an object to it's string representation. + */ +public class ConvertToStringUdf implements ScalarUdf<String> { + @Override + public void init(Config udfConfig) { + } + + @Override + public String execute(Object... args) { + return args[0].toString(); + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java new file mode 100644 index 0000000..58b5c99 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/fn/GetSqlFieldUdf.java @@ -0,0 +1,95 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.fn; + +import java.util.List; +import java.util.Map; +import org.apache.commons.lang.Validate; +import org.apache.samza.config.Config; +import org.apache.samza.sql.SamzaSqlRelRecord; +import org.apache.samza.sql.udfs.ScalarUdf; + + +/** + * UDF that extracts a field value from a nested SamzaSqlRelRecord by recursively following a query path. + * Note that the root object must be a SamzaSqlRelRecord. + * + * Syntax for field specification: + * <ul> + * <li> SamzaSqlRelRecord/Map: <code> field.subfield </code> </li> + * <li> Array: <code> field[index] </code> </li> + * <li> Scalar types: <code> field </code> </li> + * </ul> + * + * Example query: <code> pageViewEvent.requestHeader.properties.cookies[3].sessionKey </code> + * + * Above query extracts the sessionKey field from below nested record: + * + * pageViewEvent (SamzaSqlRelRecord) + * - requestHeader (SamzaSqlRelRecord) + * - properties (Map) + * - cookies (Array) + * - sessionKey (Scalar) + * + */ +public class GetSqlFieldUdf implements ScalarUdf<String> { + @Override + public void init(Config udfConfig) { + } + + @Override + public String execute(Object... args) { + Object currentFieldOrValue = args[0]; + Validate.isTrue(currentFieldOrValue == null + || currentFieldOrValue instanceof SamzaSqlRelRecord); + if (currentFieldOrValue != null && args.length > 1) { + String[] fieldNameChain = ((String) args[1]).split("\\."); + for (int i = 0; i < fieldNameChain.length && currentFieldOrValue != null; i++) { + currentFieldOrValue = extractField(fieldNameChain[i], currentFieldOrValue); + } + } + + if (currentFieldOrValue != null) { + return currentFieldOrValue.toString(); + } + + return null; + } + + static Object extractField(String fieldName, Object current) { + if (current instanceof SamzaSqlRelRecord) { + SamzaSqlRelRecord record = (SamzaSqlRelRecord) current; + Validate.isTrue(record.getFieldNames().contains(fieldName), + String.format("Invalid field %s in %s", fieldName, record)); + return record.getField(fieldName).orElse(null); + } else if (current instanceof Map) { + Map map = (Map) current; + Validate.isTrue(map.containsKey(fieldName), String.format("Invalid field %s in %s", fieldName, map)); + return map.get(fieldName); + } else if (current instanceof List && fieldName.endsWith("]")) { + List list = (List) current; + int index = Integer.parseInt(fieldName.substring(fieldName.indexOf("[") + 1, fieldName.length() - 1)); + return list.get(index); + } + + throw new IllegalArgumentException(String.format( + "Unsupported accessing operation for data type: %s with field: %s.", current.getClass(), fieldName)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java index 2514d30..80cb789 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java @@ -21,14 +21,15 @@ package org.apache.samza.sql.impl; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.sql.SamzaSqlRelRecord; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; +import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory; import org.apache.samza.table.descriptors.TableDescriptor; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; -import org.apache.samza.sql.data.SamzaSqlCompositeKey; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOResolverFactory; import org.apache.samza.sql.interfaces.SqlIOConfig; -import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,9 +108,12 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { TableDescriptor tableDescriptor = null; if (isTable) { String tableId = changeLogStorePrefix + "InputTable-" + name.replace(".", "-").replace("$", "-"); - tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of( - new JsonSerdeV2<>(SamzaSqlCompositeKey.class), - new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled(); + SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde = + (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); + SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = + (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); + tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)) + .withChangelogEnabled(); } return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor); http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java index f36d990..d83ca7f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java @@ -150,7 +150,7 @@ public class QueryPlanner { public RelDataType getRowType(RelDataTypeFactory typeFactory) { List<RelDataTypeField> fieldsList = new ArrayList<>(); fieldsList.add(new RelDataTypeFieldImpl(SamzaSqlRelMessage.KEY_NAME, 0, - typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true))); + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true))); fieldsList.addAll(relationalSchema.getFieldList()); return new RelRecordType(fieldsList); } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index 0939f7b..5f44ff9 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -19,6 +19,7 @@ package org.apache.samza.sql.translator; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -39,19 +40,19 @@ import org.apache.commons.lang.Validate; import org.apache.samza.SamzaException; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; +import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory; import org.apache.samza.table.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.samza.sql.data.SamzaSqlCompositeKey.createSamzaSqlCompositeKey; +import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames; +import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey; /** @@ -103,8 +104,10 @@ class JoinTranslator { isTablePosOnRight ? context.getMessageStream(join.getLeft().getId()) : context.getMessageStream(join.getRight().getId()); - List<String> streamFieldNames = (isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames(); - List<String> tableFieldNames = (isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames(); + List<String> streamFieldNames = + new ArrayList<>((isTablePosOnRight ? join.getLeft() : join.getRight()).getRowType().getFieldNames()); + List<String> tableFieldNames = + new ArrayList<>((isTablePosOnRight ? join.getRight() : join.getLeft()).getRowType().getFieldNames()); Validate.isTrue(streamKeyIds.size() == tableKeyIds.size()); log.info("Joining on the following Stream and Table field(s): "); for (int i = 0; i < streamKeyIds.size(); i++) { @@ -113,23 +116,25 @@ class JoinTranslator { SamzaSqlRelMessageJoinFunction joinFn = new SamzaSqlRelMessageJoinFunction(join.getJoinType(), isTablePosOnRight, streamKeyIds, streamFieldNames, - tableFieldNames); + tableKeyIds, tableFieldNames); - Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); + SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde = + (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); // Always re-partition the messages from the input stream by the composite key and then join the messages - // with the table. + // with the table. For the composite key, provide the corresponding table names in the key instead of using + // the names from the stream as the lookup needs to be done based on what is stored in the local table. MessageStream<SamzaSqlRelMessage> outputStream = inputStream - .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds), + .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds, + getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)), m -> m, KVSerde.of(keySerde, valueSerde), intermediateStreamPrefix + "stream_" + joinId) .map(KV::getValue) .join(table, joinFn); - // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn); context.registerMessageStream(join.getId(), outputStream); } @@ -299,13 +304,13 @@ class JoinTranslator { // Create a table backed by RocksDb store with the fields in the join condition as composite key and relational // message as the value. Send the messages from the input stream denoted as 'table' to the created table store. - Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table = + Table<KV<SamzaSqlRelRecord, SamzaSqlRelMessage>> table = context.getStreamAppDescriptor().getTable(sourceTableConfig.getTableDescriptor().get()); - Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); + SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde = + (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); - // Let's always repartition by the join fields as key before sending the key and value to the table. // We need to repartition the stream denoted as table to ensure that both the stream and table that are joined // have the same partitioning scheme and partition key. http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java index 889ea97..d0a3d11 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/SamzaSqlRelMessageJoinFunction.java @@ -25,20 +25,21 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.commons.lang.Validate; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.StreamTableJoinFunction; -import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.samza.sql.data.SamzaSqlCompositeKey.*; +import static org.apache.samza.sql.data.SamzaSqlRelMessage.createSamzaSqlCompositeKey; +import static org.apache.samza.sql.data.SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames; /** * This class joins incoming {@link SamzaSqlRelMessage} from a stream with the records in a table with the join key - * being {@link SamzaSqlCompositeKey} + * being {@link SamzaSqlRelRecord} */ public class SamzaSqlRelMessageJoinFunction - implements StreamTableJoinFunction<SamzaSqlCompositeKey, SamzaSqlRelMessage, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>, SamzaSqlRelMessage> { + implements StreamTableJoinFunction<SamzaSqlRelRecord, SamzaSqlRelMessage, KV<SamzaSqlRelRecord, SamzaSqlRelMessage>, SamzaSqlRelMessage> { private static final Logger log = LoggerFactory.getLogger(SamzaSqlRelMessageJoinFunction.class); @@ -46,17 +47,20 @@ public class SamzaSqlRelMessageJoinFunction private final boolean isTablePosOnRight; private final ArrayList<Integer> streamFieldIds; // Table field names are used in the outer join when the table record is not found. + private final ArrayList<Integer> tableKeyIds; private final ArrayList<String> tableFieldNames; private final ArrayList<String> outFieldNames; SamzaSqlRelMessageJoinFunction(JoinRelType joinRelType, boolean isTablePosOnRight, - List<Integer> streamFieldIds, List<String> streamFieldNames, List<String> tableFieldNames) { + List<Integer> streamFieldIds, List<String> streamFieldNames, List<Integer> tableKeyIds, + List<String> tableFieldNames) { this.joinRelType = joinRelType; this.isTablePosOnRight = isTablePosOnRight; Validate.isTrue((joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnRight) || (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && !isTablePosOnRight) || joinRelType.compareTo(JoinRelType.INNER) == 0); this.streamFieldIds = new ArrayList<>(streamFieldIds); + this.tableKeyIds = new ArrayList<>(tableKeyIds); this.tableFieldNames = new ArrayList<>(tableFieldNames); this.outFieldNames = new ArrayList<>(); if (isTablePosOnRight) { @@ -69,7 +73,7 @@ public class SamzaSqlRelMessageJoinFunction } @Override - public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) { + public SamzaSqlRelMessage apply(SamzaSqlRelMessage message, KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) { if (joinRelType.compareTo(JoinRelType.INNER) == 0 && record == null) { log.debug("Inner Join: Record not found for the message with key: " + getMessageKey(message)); @@ -106,12 +110,13 @@ public class SamzaSqlRelMessageJoinFunction } @Override - public SamzaSqlCompositeKey getMessageKey(SamzaSqlRelMessage message) { - return createSamzaSqlCompositeKey(message, streamFieldIds); + public SamzaSqlRelRecord getMessageKey(SamzaSqlRelMessage message) { + return createSamzaSqlCompositeKey(message, streamFieldIds, + getSamzaSqlCompositeKeyFieldNames(tableFieldNames, tableKeyIds)); } @Override - public SamzaSqlCompositeKey getRecordKey(KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record) { + public SamzaSqlRelRecord getRecordKey(KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record) { return record.getKey(); } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java index 708eb3e..40aa791 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java @@ -19,7 +19,6 @@ package org.apache.samza.sql.avro; -import com.google.common.base.Joiner; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -160,8 +159,7 @@ public class TestAvroRelConversion { record.put("name", "name1"); SamzaSqlRelMessage message = simpleRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)); - LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldValues())); - LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames())); + LOG.info(message.toString()); } @Test @@ -263,8 +261,7 @@ public class TestAvroRelConversion { SamzaSqlRelMessage relMessage = nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)); - LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldValues())); - LOG.info(Joiner.on(",").join(relMessage.getSamzaSqlRelRecord().getFieldNames())); + LOG.info(relMessage.toString()); KV<Object, Object> samzaMessage = nestedRecordAvroRelConverter.convertToSamzaMessage(relMessage); GenericRecord recordPostConversion = (GenericRecord) samzaMessage.getValue(); @@ -321,8 +318,7 @@ public class TestAvroRelConversion { Arrays.equals(((ByteString) message.getSamzaSqlRelRecord().getField("fixed_value").get()).getBytes(), DEFAULT_TRACKING_ID_BYTES)); - LOG.info(Joiner.on(",").useForNull("null").join(message.getSamzaSqlRelRecord().getFieldValues())); - LOG.info(Joiner.on(",").join(message.getSamzaSqlRelRecord().getFieldNames())); + LOG.info(message.toString()); KV<Object, Object> samzaMessage = complexRecordAvroRelConverter.convertToSamzaMessage(message); GenericRecord record = (GenericRecord) samzaMessage.getValue(); http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java index d0a2f59..a4f3afc 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java @@ -20,7 +20,9 @@ package org.apache.samza.sql.data; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.junit.Assert; import org.junit.Test; @@ -61,4 +63,32 @@ public class TestSamzaSqlRelMessage { Assert.assertNotEquals(message1, message2); Assert.assertNotEquals(message1.hashCode(), message2.hashCode()); } + + @Test + public void testCompositeKeyCreation() { + List<String> keyPartNames = Arrays.asList("kfield1", "kfield2"); + SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); + + SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Collections.singletonList(0)); + Assert.assertEquals(relRecord1.getFieldNames().size(), 1); + Assert.assertEquals(relRecord1.getFieldNames().get(0), "field1"); + Assert.assertEquals(relRecord1.getFieldValues().get(0), "value1"); + + SamzaSqlRelRecord relRecord2 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0), + SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1, 0))); + Assert.assertEquals(relRecord2.getFieldNames().size(), 2); + Assert.assertEquals(relRecord2.getFieldNames().get(0), "kfield2"); + Assert.assertEquals(relRecord2.getFieldValues().get(0), "value2"); + Assert.assertEquals(relRecord2.getFieldNames().get(1), "kfield1"); + Assert.assertEquals(relRecord2.getFieldValues().get(1), "value1"); + } + + @Test (expected = IllegalArgumentException.class) + public void testCompositeKeyCreationWithInEqualKeyNameValues() { + List<String> keyPartNames = Arrays.asList("kfield1", "kfield2"); + SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); + + SamzaSqlRelRecord relRecord1 = SamzaSqlRelMessage.createSamzaSqlCompositeKey(message, Arrays.asList(1, 0), + SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(keyPartNames, Arrays.asList(1))); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java new file mode 100644 index 0000000..9b80549 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestConvertToStringUdf.java @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.fn; + +import org.junit.Assert; +import org.junit.Test; + + +public class TestConvertToStringUdf { + + private enum LightSwitch { + On, + Off + } + + @Test + public void testConvertIntegerToString() { + ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf(); + Assert.assertEquals(convertToStringUdf.execute(10), "10"); + } + + @Test + public void testConvertLongToString() { + ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf(); + Assert.assertEquals(convertToStringUdf.execute(10000000000L), "10000000000"); + } + + @Test + public void testConvertDoubleToString() { + ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf(); + Assert.assertEquals(convertToStringUdf.execute(10.0000345), "10.0000345"); + } + + @Test + public void testConvertBooleanToString() { + ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf(); + Assert.assertEquals(convertToStringUdf.execute(true), "true"); + } + + @Test + public void testConvertEnumToString() { + ConvertToStringUdf convertToStringUdf = new ConvertToStringUdf(); + Assert.assertEquals(convertToStringUdf.execute(LightSwitch.On), "On"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java new file mode 100644 index 0000000..b084fd1 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java @@ -0,0 +1,166 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql.fn; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.samza.sql.SamzaSqlRelRecord; +import org.junit.Assert; +import org.junit.Test; + + +public class TestGetSqlFieldUdf { + static Object createRecord(List<String> fieldNames, int level) { + String fieldName = fieldNames.get(level); + Object child = (level == fieldNames.size() - 1) ? "bar" : createRecord(fieldNames, level + 1); + boolean isMap = false; + int arrayIndex = -1; + if (fieldName.startsWith("map:")) { + isMap = true; + fieldName = fieldName.substring(4); // strip "map:" + } else if (fieldName.endsWith("]")) { + arrayIndex = Integer.parseInt(fieldName.substring(fieldName.indexOf("[") + 1, fieldName.length() - 1)); + fieldName = fieldName.substring(0, fieldName.indexOf("[")); + } + + if (isMap) { + Map<String, Object> retMap = new HashMap<>(); + retMap.put(fieldName, child); + return retMap; + } else if (arrayIndex >= 0) { + List list = Arrays.asList(new Object[2 * arrayIndex]); + list.set(arrayIndex, child); + return list; + } else { + return new SamzaSqlRelRecord(Collections.singletonList(fieldName), Collections.singletonList(child)); + } + } + + private SamzaSqlRelRecord createRecord(String path) { + return (SamzaSqlRelRecord) createRecord(Arrays.asList(path.split("\\.")), 0); + } + + @Test + public void testSingleLevel() { + SamzaSqlRelRecord record = createRecord("foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "foo"), "bar"); + } + + @Test + public void testMultiLevel() { + SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar"); + } + + @Test + public void testNullRecord() { + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(null, "bar.baz.baf.foo"), null); + } + + @Test (expected = NullPointerException.class) + public void testNullFields() { + SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + getSqlFieldUdf.execute(record, null); + } + + @Test(expected = IllegalArgumentException.class) + public void testSingleLevelInvalidField() { + SamzaSqlRelRecord record = createRecord("foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + getSqlFieldUdf.execute(record, "bar"); + } + + @Test(expected = IllegalArgumentException.class) + public void testMultiLevelInvalidIntermediateField() { + SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + getSqlFieldUdf.execute(record, "bar.baz.bacon"); + } + + @Test(expected = IllegalArgumentException.class) + public void testMultiLevelInvalidFinalField() { + SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + getSqlFieldUdf.execute(record, "bar.baz.baf.funny"); + } + + @Test(expected = IllegalArgumentException.class) + public void testPathTooDeep() { + SamzaSqlRelRecord record = createRecord("bar.foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + getSqlFieldUdf.execute(record, "bar.baz.baf.funny"); + } + + @Test + public void testMapAtLastField() { + SamzaSqlRelRecord record = createRecord("bar.baz.baf.map:foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar"); + } + + @Test + public void testMapAtIntermediateFields() { + SamzaSqlRelRecord record = createRecord("bar.map:baz.map:baf.foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar"); + } + + @Test + public void testMapAtAllIntermediateFields() { + SamzaSqlRelRecord record = createRecord("bar.map:baz.map:baf.map:foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo"), "bar"); + } + + @Test + public void testArrayAtLastField() { + SamzaSqlRelRecord record = createRecord("bar.baz.baf.foo[3]"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo[3]"), "bar"); + } + + @Test + public void testArrayAtIntermediateFields() { + SamzaSqlRelRecord record = createRecord("bar.baz[3].baf[2].foo"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz[3].baf[2].foo"), "bar"); + } + + @Test + public void testArrayAtAllIntermediateFields() { + SamzaSqlRelRecord record = createRecord("bar.baz[2].baf[3].foo[5]"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz[2].baf[3].foo[5]"), "bar"); + } + + @Test + public void testAllFieldTypes() { + SamzaSqlRelRecord record = createRecord("bar.map:baz.baf.foo[3].fun"); + GetSqlFieldUdf getSqlFieldUdf = new GetSqlFieldUdf(); + Assert.assertEquals(getSqlFieldUdf.execute(record, "bar.baz.baf.foo[3].fun"), "bar"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index fd811cd..41c809e 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -188,7 +188,7 @@ public class TestAvroSystemFactory implements SystemFactory { // We send num Messages and an end of stream message following that. List<IncomingMessageEnvelope> envelopes = IntStream.range(curMessages, curMessages + numMessages/4) - .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i, + .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, getKey(i, ssp), getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp)) .collect(Collectors.toList()); envelopeMap.put(ssp, envelopes); @@ -201,6 +201,10 @@ public class TestAvroSystemFactory implements SystemFactory { return envelopeMap; } + private Object getKey(int index, SystemStreamPartition ssp) { + return "key" + index; + } + private Object getData(int index, SystemStreamPartition ssp) { if (simpleRecordMap.contains(ssp)) { return createSimpleRecord(index); http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index e54223c..76ebfc2 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -42,7 +42,6 @@ import org.apache.samza.sql.impl.ConfigBasedUdfResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.system.TestAvroSystemFactory; -import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java index 818e33d..5e7dd4c 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java @@ -26,16 +26,15 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang.NotImplementedException; import org.apache.samza.config.Config; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; +import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory; import org.apache.samza.table.descriptors.BaseTableDescriptor; import org.apache.samza.table.descriptors.TableDescriptor; -import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.sql.data.SamzaSqlCompositeKey; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOResolverFactory; -import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.Table; @@ -207,9 +206,11 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size()); } else { String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-"); - tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of( - new JsonSerdeV2<>(SamzaSqlCompositeKey.class), - new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled(); + SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde keySerde = + (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null); + SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = + (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); + tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(keySerde, valueSerde)).withChangelogEnabled(); } tableDescMap.put(ioName, tableDescriptor); } http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java index 5dd2d21..6362155 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestSamzaSqlRelMessageJoinFunction.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.calcite.rel.core.JoinRelType; import org.apache.samza.operators.KV; -import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.junit.Assert; import org.junit.Test; @@ -45,11 +45,11 @@ public class TestSamzaSqlRelMessageJoinFunction { JoinRelType joinRelType = JoinRelType.INNER; List<Integer> streamKeyIds = Arrays.asList(0, 1); List<Integer> tableKeyIds = Arrays.asList(0, 1); - SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); - KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); + SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); + KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), @@ -68,11 +68,12 @@ public class TestSamzaSqlRelMessageJoinFunction { JoinRelType joinRelType = JoinRelType.INNER; List<Integer> streamKeyIds = Arrays.asList(0, 2); List<Integer> tableKeyIds = Arrays.asList(0, 2); - SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); - KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); + SamzaSqlRelRecord compositeKey = SamzaSqlRelMessage.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); + KV<SamzaSqlRelRecord, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames); + new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, + tableKeyIds, tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), @@ -89,9 +90,10 @@ public class TestSamzaSqlRelMessageJoinFunction { SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); JoinRelType joinRelType = JoinRelType.INNER; List<Integer> streamKeyIds = Arrays.asList(0, 1); + List<Integer> tableKeyIds = Arrays.asList(2, 3); SamzaSqlRelMessageJoinFunction joinFn = - new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableKeyIds, tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); Assert.assertNull(outMsg); } @@ -101,10 +103,11 @@ public class TestSamzaSqlRelMessageJoinFunction { SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); JoinRelType joinRelType = JoinRelType.LEFT; List<Integer> streamKeyIds = Arrays.asList(0, 1); + List<Integer> tableKeyIds = Arrays.asList(2, 3); SamzaSqlRelMessageJoinFunction joinFn = new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, - tableFieldNames); + tableKeyIds, tableFieldNames); SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); Assert.assertEquals(outMsg.getSamzaSqlRelRecord().getFieldValues().size(), http://git-wip-us.apache.org/repos/asf/samza/blob/dcd4b558/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java index 6c7a8c2..1b1edbc 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/avro/AvroSchemaGenRelConverter.java @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectData; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.avro.AvroRelConverter; import org.apache.samza.sql.avro.AvroRelSchemaProvider; import org.apache.samza.sql.data.SamzaSqlRelMessage; @@ -52,14 +53,15 @@ public class AvroSchemaGenRelConverter extends AvroRelConverter { @Override public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) { - Schema schema = computeSchema(streamName, relMessage); - return convertToSamzaMessage(relMessage, schema); + Schema payloadSchema = computePayloadSchema(streamName, relMessage); + return convertToSamzaMessage(relMessage, payloadSchema); } - private Schema computeSchema(String streamName, SamzaSqlRelMessage relMessage) { + private Schema computePayloadSchema(String streamName, SamzaSqlRelMessage relMessage) { + SamzaSqlRelRecord relRecord = relMessage.getSamzaSqlRelRecord(); List<Schema.Field> keyFields = new ArrayList<>(); - List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames(); - List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues(); + List<String> fieldNames = relRecord.getFieldNames(); + List<Object> values = relRecord.getFieldValues(); for (int index = 0; index < fieldNames.size(); index++) { if (fieldNames.get(index).equals(SamzaSqlRelMessage.KEY_NAME) || values.get(index) == null) {
