SAMZA-484; define serialization for tuples in samza-sql
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a40d5a9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a40d5a9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a40d5a9 Branch: refs/heads/samza-sql Commit: 6a40d5a9a55605d783fe8e34643207acea1dc433 Parents: d4861df Author: Navina Ramesh <[email protected]> Authored: Thu Feb 12 14:30:04 2015 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Feb 12 14:30:04 2015 -0800 ---------------------------------------------------------------------- build.gradle | 10 +- gradle/dependency-versions.gradle | 1 + samza-sql/README | 1 - samza-sql/README.md | 1 + .../org/apache/samza/sql/api/data/Data.java | 54 ++++ .../org/apache/samza/sql/api/data/Schema.java | 55 ++++ .../org/apache/samza/sql/api/data/Tuple.java | 4 +- .../samza/sql/data/IncomingMessageTuple.java | 9 +- .../apache/samza/sql/data/avro/AvroData.java | 262 ++++++++++++++++ .../apache/samza/sql/data/avro/AvroSchema.java | 296 +++++++++++++++++++ .../sql/data/serializers/SqlStringSerde.java | 45 +++ .../data/serializers/SqlStringSerdeFactory.java | 33 +++ .../samza/sql/data/string/StringData.java | 101 +++++++ .../samza/sql/data/string/StringSchema.java | 73 +++++ .../sql/operators/partition/PartitionOp.java | 5 +- 15 files changed, 932 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index e6b10fc..b49c313 100644 --- a/build.gradle +++ b/build.gradle @@ -78,7 +78,7 @@ rat { 'gradlew', 'gradlew.bat', 'samza-test/state/mystore/**', - 'README.md', + '**/README.md', 'RELEASE.md', ] } @@ -249,18 +249,12 @@ project(":samza-yarn_$scalaVersion") { project(":samza-sql_$scalaVersion") { apply plugin: 'java' - configurations { - // Remove transitive dependencies from Zookeeper that we don't want. - compile.exclude group: 'javax.jms', module: 'jms' - compile.exclude group: 'com.sun.jdmk', module: 'jmxtools' - compile.exclude group: 'com.sun.jmx', module: 'jmxri' - } - dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") compile project(":samza-kv_$scalaVersion") compile "commons-collections:commons-collections:$commonsCollectionVersion" + compile "org.apache.avro:avro:$avroVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 6f815b2..03c72f8 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -35,4 +35,5 @@ guavaVersion = "17.0" commonsCodecVersion = "1.9" commonsCollectionVersion = "3.2.1" + avroVersion = "1.7.7" } http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/README ---------------------------------------------------------------------- diff --git a/samza-sql/README b/samza-sql/README deleted file mode 100644 index 65b7558..0000000 --- a/samza-sql/README +++ /dev/null @@ -1 +0,0 @@ -samza-sql is an experimental module that is under development (SAMZA-390). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/README.md ---------------------------------------------------------------------- diff --git a/samza-sql/README.md b/samza-sql/README.md new file mode 100644 index 0000000..598670b --- /dev/null +++ b/samza-sql/README.md @@ -0,0 +1 @@ +samza-sql is an experimental module that is under development (SAMZA-390). http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java new file mode 100644 index 0000000..d1b8409 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.data; + +import java.util.List; +import java.util.Map; + + +public interface Data { + + Schema schema(); + + Object value(); + + int intValue(); + + long longValue(); + + float floatValue(); + + double doubleValue(); + + boolean booleanValue(); + + String strValue(); + + byte[] bytesValue(); + + List<Object> arrayValue(); + + Map<Object, Object> mapValue(); + + Data getElement(int index); + + Data getFieldData(String fldName); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java new file mode 100644 index 0000000..1e8f192 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.api.data; + +import java.util.Map; + + +public interface Schema { + + enum Type { + INTEGER, + LONG, + FLOAT, + DOUBLE, + BOOLEAN, + STRING, + BYTES, + STRUCT, + ARRAY, + MAP + }; + + Type getType(); + + Schema getElementType(); + + Schema getValueType(); + + Map<String, Schema> getFields(); + + Schema getFieldType(String fldName); + + Data read(Object object); + + Data transform(Data inputData); + + boolean equals(Schema other); +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java index 0c21a53..bc8efcf 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java @@ -32,7 +32,7 @@ public interface Tuple { * * @return Message object in the tuple */ - Object getMessage(); + Data getMessage(); /** * Method to indicate whether the tuple is a delete tuple or an insert tuple @@ -46,7 +46,7 @@ public interface Tuple { * * @return The <code>key</code> of the tuple */ - Object getKey(); + Data getKey(); /** * Get the stream name of the tuple. Note this stream name should be unique in the system. http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java index a8a55e2..f868e5c 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java @@ -18,6 +18,7 @@ */ package org.apache.samza.sql.data; +import org.apache.samza.sql.api.data.Data; import org.apache.samza.sql.api.data.EntityName; import org.apache.samza.sql.api.data.Tuple; import org.apache.samza.system.IncomingMessageEnvelope; @@ -52,8 +53,8 @@ public class IncomingMessageTuple implements Tuple { // TODO: the return type should be changed to the generic data type @Override - public Object getMessage() { - return this.imsg.getMessage(); + public Data getMessage() { + return (Data) this.imsg.getMessage(); } @Override @@ -62,8 +63,8 @@ public class IncomingMessageTuple implements Tuple { } @Override - public Object getKey() { - return imsg.getKey(); + public Data getKey() { + return (Data) this.imsg.getKey(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java new file mode 100644 index 0000000..d040be9 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.data.avro; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.sql.api.data.Data; +import org.apache.samza.sql.api.data.Schema; + + +public class AvroData implements Data { + protected final Object datum; + protected final AvroSchema schema; + + private AvroData(AvroSchema schema, Object datum) { + this.datum = datum; + this.schema = schema; + } + + @Override + public Schema schema() { + return this.schema; + } + + @Override + public Object value() { + return this.datum; + } + + @Override + public int intValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public long longValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public float floatValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public double doubleValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public boolean booleanValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public String strValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public byte[] bytesValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public List<Object> arrayValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Map<Object, Object> mapValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Data getElement(int index) { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Data getFieldData(String fldName) { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + public static AvroData getArray(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + @SuppressWarnings("unchecked") + private final GenericArray<Object> array = (GenericArray<Object>) this.datum; + + @Override + public List<Object> arrayValue() { + return this.array; + } + + @Override + public Data getElement(int index) { + return this.schema.getElementType().read(array.get(index)); + } + + }; + } + + public static AvroData getMap(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + @SuppressWarnings("unchecked") + private final Map<Object, Object> map = (Map<Object, Object>) datum; + + @Override + public Map<Object, Object> mapValue() { + return this.map; + } + + @Override + public Data getFieldData(String fldName) { + // TODO Auto-generated method stub + return this.schema.getValueType().read(map.get(fldName)); + } + + }; + } + + public static AvroData getStruct(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.STRUCT) { + throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + private final GenericRecord record = (GenericRecord) datum; + + @Override + public Data getFieldData(String fldName) { + // TODO Auto-generated method stub + return this.schema.getFieldType(fldName).read(record.get(fldName)); + } + + }; + } + + public static AvroData getInt(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public int intValue() { + return ((Integer) datum).intValue(); + } + + }; + } + + public static AvroData getLong(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public long longValue() { + return ((Long) datum).longValue(); + } + + }; + } + + public static AvroData getFloat(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public float floatValue() { + return ((Float) datum).floatValue(); + } + + }; + } + + public static AvroData getDouble(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public double doubleValue() { + return ((Double) datum).doubleValue(); + } + + }; + } + + public static AvroData getBoolean(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public boolean booleanValue() { + return ((Boolean) datum).booleanValue(); + } + + }; + } + + public static AvroData getString(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public String strValue() { + return ((CharSequence) datum).toString(); + } + + }; + } + + public static AvroData getBytes(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public byte[] bytesValue() { + return ((ByteBuffer) datum).array(); + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java new file mode 100644 index 0000000..577cf74 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.data.avro; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema.Field; +import org.apache.samza.sql.api.data.Data; +import org.apache.samza.sql.api.data.Schema; + + +public class AvroSchema implements Schema { + + protected final org.apache.avro.Schema avroSchema; + protected final Schema.Type type; + + private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas = + new HashMap<org.apache.avro.Schema.Type, AvroSchema>(); + + static { + primSchemas.put(org.apache.avro.Schema.Type.INT, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) { + @Override + public Data read(Object datum) { + return AvroData.getInt(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.LONG, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) { + @Override + public Data read(Object datum) { + return AvroData.getLong(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.FLOAT, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) { + @Override + public Data read(Object datum) { + return AvroData.getFloat(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.DOUBLE, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) { + @Override + public Data read(Object datum) { + return AvroData.getDouble(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) { + @Override + public Data read(Object datum) { + return AvroData.getBoolean(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.STRING, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) { + @Override + public Data read(Object datum) { + return AvroData.getString(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.BYTES, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) { + @Override + public Data read(Object datum) { + return AvroData.getBytes(this, datum); + } + }); + }; + + public static AvroSchema getSchema(final org.apache.avro.Schema schema) { + Schema.Type type = mapType(schema.getType()); + if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) { + return primSchemas.get(schema.getType()); + } + // otherwise, construct the new schema + // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects + switch (type) { + case ARRAY: + return new AvroSchema(schema) { + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + if (!input.schema().getElementType().equals(this.getElementType())) { + throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " + + input.schema().getElementType().getType()); + } + // input type matches array type + return AvroData.getArray(this, input.value()); + } + }; + case MAP: + return new AvroSchema(schema) { + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + if (!input.schema().getValueType().equals(this.getValueType())) { + throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " + + input.schema().getValueType().getType()); + } + // input type matches map type + return AvroData.getMap(this, input.value()); + } + }; + case STRUCT: + return new AvroSchema(schema) { + @SuppressWarnings("serial") + private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() { + { + for (Field field : schema.getFields()) { + put(field.name(), getSchema(field.schema())); + } + } + }; + + @Override + public Map<String, Schema> getFields() { + return this.fldSchemas; + } + + @Override + public Schema getFieldType(String fldName) { + return this.fldSchemas.get(fldName); + } + + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.STRUCT) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + // Note: this particular transform function only implements "projection to a sub-set" concept. + // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed + for (String fldName : this.fldSchemas.keySet()) { + // check each field schema matches input + Schema fldSchema = this.fldSchemas.get(fldName); + Schema inputFld = input.schema().getFieldType(fldName); + if (!fldSchema.equals(inputFld)) { + throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName + + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType()); + } + } + // input type matches struct type + return AvroData.getStruct(this, input.value()); + } + + }; + default: + throw new IllegalArgumentException("Un-recognized complext data type:" + type); + } + } + + private AvroSchema(org.apache.avro.Schema schema) { + this.avroSchema = schema; + this.type = mapType(schema.getType()); + } + + private static Type mapType(org.apache.avro.Schema.Type type) { + switch (type) { + case ARRAY: + return Schema.Type.ARRAY; + case RECORD: + return Schema.Type.STRUCT; + case MAP: + return Schema.Type.MAP; + case INT: + return Schema.Type.INTEGER; + case LONG: + return Schema.Type.LONG; + case BOOLEAN: + return Schema.Type.BOOLEAN; + case FLOAT: + return Schema.Type.FLOAT; + case DOUBLE: + return Schema.Type.DOUBLE; + case STRING: + return Schema.Type.STRING; + case BYTES: + return Schema.Type.BYTES; + default: + throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); + } + } + + @Override + public Type getType() { + return this.type; + } + + @Override + public Schema getElementType() { + if (this.type != Schema.Type.ARRAY) { + throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); + } + return getSchema(this.avroSchema.getElementType()); + } + + @Override + public Schema getValueType() { + if (this.type != Schema.Type.MAP) { + throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); + } + return getSchema(this.avroSchema.getValueType()); + } + + @Override + public Map<String, Schema> getFields() { + throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); + } + + @Override + public Schema getFieldType(String fldName) { + throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); + } + + @Override + public Data read(Object object) { + if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) { + return AvroData.getArray(this, object); + } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) { + return AvroData.getMap(this, object); + } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) { + return AvroData.getStruct(this, object); + } + throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported"); + } + + @Override + public Data transform(Data inputData) { + if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP + || inputData.schema().getType() == Schema.Type.STRUCT) { + throw new IllegalArgumentException("Complex schema should have overriden the default transform() function."); + } + if (inputData.schema().getType() != this.type) { + throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type + + ", input type:" + inputData.schema().getType()); + } + return inputData; + } + + @Override + public boolean equals(Schema other) { + // TODO Auto-generated method stub + if (this.type != other.getType()) { + return false; + } + switch (this.type) { + case ARRAY: + // check if element types are the same + return this.getElementType().equals(other.getElementType()); + case MAP: + // check if value types are the same + return this.getValueType().equals(other.getValueType()); + case STRUCT: + // check if the fields schemas in this equals the other + // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform() + for (String fieldName : this.getFields().keySet()) { + if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) { + return false; + } + } + return true; + default: + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java new file mode 100644 index 0000000..1f0c3b2 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.data.serializers; + +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.sql.data.string.StringData; + +import java.io.UnsupportedEncodingException; + +public class SqlStringSerde implements Serde<StringData> { + + private final Serde<String> serde; + + public SqlStringSerde(String encoding) { + this.serde = new StringSerde(encoding); + } + + @Override + public StringData fromBytes(byte[] bytes) { + return new StringData(serde.fromBytes(bytes)); + } + + @Override + public byte[] toBytes(StringData object) { + return serde.toBytes(object.strValue()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java new file mode 100644 index 0000000..2564479 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.data.serializers; + + +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.sql.data.string.StringData; + +public class SqlStringSerdeFactory implements SerdeFactory<StringData> { + @Override + public Serde<StringData> getSerde(String name, Config config) { + return new SqlStringSerde(config.get("encoding", "UTF-8")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java new file mode 100644 index 0000000..b81d9fa --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.data.string; + +import org.apache.samza.sql.api.data.Data; +import org.apache.samza.sql.api.data.Schema; + +import java.util.List; +import java.util.Map; + +public class StringData implements Data { + private final Object datum; + private final Schema schema; + + public StringData(Object datum) { + this.datum = datum; + this.schema = new StringSchema(); + } + + @Override + public Schema schema() { + return this.schema; + } + + @Override + public Object value() { + return this.datum; + } + + @Override + public int intValue() { + throw new UnsupportedOperationException("Can't get int value for a string type data"); + } + + @Override + public long longValue() { + throw new UnsupportedOperationException("Can't get long value for a string type data"); + } + + @Override + public float floatValue() { + throw new UnsupportedOperationException("Can't get float value for a string type data"); + } + + @Override + public double doubleValue() { + throw new UnsupportedOperationException("Can't get double value for a string type data"); + } + + @Override + public boolean booleanValue() { + throw new UnsupportedOperationException("Can't get boolean value for a string type data"); + } + + @Override + public String strValue() { + return String.valueOf(datum); + } + + @Override + public byte[] bytesValue() { + throw new UnsupportedOperationException("Can't get bytesValue for a string type data"); + } + + @Override + public List<Object> arrayValue() { + throw new UnsupportedOperationException("Can't get arrayValue for a string type data"); + } + + @Override + public Map<Object, Object> mapValue() { + throw new UnsupportedOperationException("Can't get mapValue for a string type data"); + } + + @Override + public Data getElement(int index) { + throw new UnsupportedOperationException("Can't getElement(index) on a string type data"); + } + + @Override + public Data getFieldData(String fldName) { + throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java new file mode 100644 index 0000000..348fc0c --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.data.string; + +import org.apache.samza.sql.api.data.Data; +import org.apache.samza.sql.api.data.Schema; + +import java.util.Map; + +public class StringSchema implements Schema { + private Type type = Type.STRING; + + @Override + public Type getType() { + return Type.STRING; + } + + @Override + public Schema getElementType() { + throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); + } + + @Override + public Schema getValueType() { + throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); + } + + @Override + public Map<String, Schema> getFields() { + throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); + } + + @Override + public Schema getFieldType(String fldName) { + throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); + } + + @Override + public Data read(Object object) { + return new StringData(object); + } + + @Override + public Data transform(Data inputData) { + if (inputData.schema().getType() != this.type) { + throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type + + ", input type:" + inputData.schema().getType()); + } + return inputData; + } + + @Override + public boolean equals(Schema other) { + return other.getType() == this.type; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a40d5a9/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java index 7921d4f..986d688 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java @@ -82,9 +82,8 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator { @Override public void process(Tuple tuple, SqlMessageCollector collector) throws Exception { - collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(), - null /* TODO: when merge with Schema API changes, use: tuple - .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage())); + collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(), + tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value())); } }
