SAMZA-1050: Make samza-operator independent of avro version Prep for merging the samza-operator APIs to master: removing the direct dependency on avro.
Author: Yi Pan (Data Infrastructure) <[email protected]> Reviewers: Jagadish <[email protected]>, Prateek Maheshiwari <[email protected]> Closes #22 from nickpan47/SAMZA-1050 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7de7359 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7de7359 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7de7359 Branch: refs/heads/samza-sql Commit: a7de73594b3b85b3ad4132cc33dff1f5efcd95bf Parents: 1dac25e Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Tue Nov 22 15:39:27 2016 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Nov 22 15:39:27 2016 -0800 ---------------------------------------------------------------------- build.gradle | 18 +- .../apache/samza/operators/api/data/Data.java | 57 ---- .../apache/samza/operators/api/data/Schema.java | 58 ---- .../operators/impl/data/avro/AvroData.java | 262 ---------------- .../operators/impl/data/avro/AvroSchema.java | 296 ------------------- .../impl/data/serializers/SqlAvroSerde.java | 108 ------- .../data/serializers/SqlAvroSerdeFactory.java | 40 --- .../impl/data/serializers/SqlStringSerde.java | 44 --- .../data/serializers/SqlStringSerdeFactory.java | 33 --- .../operators/impl/data/string/StringData.java | 101 ------- .../impl/data/string/StringSchema.java | 73 ----- .../impl/data/serializers/SqlAvroSerdeTest.java | 103 ------- .../samza/task/BroadcastOperatorTask.java | 14 +- .../org/apache/samza/sql/calcite/data/Data.java | 57 ++++ .../apache/samza/sql/calcite/data/Schema.java | 58 ++++ .../samza/sql/calcite/data/avro/AvroData.java | 262 ++++++++++++++++ .../samza/sql/calcite/data/avro/AvroSchema.java | 296 +++++++++++++++++++ .../calcite/data/serializers/SqlAvroSerde.java | 108 +++++++ .../data/serializers/SqlAvroSerdeFactory.java | 40 +++ .../data/serializers/SqlStringSerde.java | 44 +++ .../data/serializers/SqlStringSerdeFactory.java | 33 +++ .../sql/calcite/data/string/StringData.java | 101 +++++++ .../sql/calcite/data/string/StringSchema.java | 73 +++++ .../data/serializers/SqlAvroSerdeTest.java | 103 +++++++ 24 files changed, 1194 insertions(+), 1188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 28c2dcf..1b3c278 100644 --- a/build.gradle +++ b/build.gradle @@ -278,7 +278,12 @@ project(":samza-yarn_$scalaVersion") { // Force scala joint compilation sourceSets.main.scala.srcDir "src/main/java" + sourceSets.test.scala.srcDir "src/test/java" + + // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting + // tasks.compileTestJava.enabled = false sourceSets.main.java.srcDirs = [] + sourceSets.test.java.srcDirs = [] dependencies { compile project(':samza-api') @@ -304,6 +309,10 @@ project(":samza-yarn_$scalaVersion") { // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4. exclude module: 'zookeeper' } + compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") { + exclude module: 'slf4j-log4j12' + exclude module: 'servlet-api' + } compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") { exclude module: 'scala-compiler' exclude module: 'slf4j-api' @@ -317,6 +326,7 @@ project(":samza-yarn_$scalaVersion") { testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile project(":samza-core_$scalaVersion").sourceSets.test.output + testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" } repositories { @@ -346,7 +356,6 @@ if (JavaVersion.current().isJava8Compatible()) { compile project(":samza-core_$scalaVersion") compile "commons-collections:commons-collections:$commonsCollectionVersion" compile "org.apache.commons:commons-lang3:$commonsLang3Version" - compile "org.apache.avro:avro:$avroVersion" compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion" testCompile project(":samza-api").sourceSets.test.output @@ -368,6 +377,7 @@ if (JavaVersion.current().isJava8Compatible()) { dependencies { compile project(":samza-operator") + compile "org.apache.avro:avro:$avroVersion" compile "org.apache.calcite:calcite-core:$calciteVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" @@ -470,7 +480,13 @@ project(":samza-kv-inmemory_$scalaVersion") { project(":samza-kv-rocksdb_$scalaVersion") { apply plugin: 'scala' + // Force scala joint compilation + sourceSets.main.scala.srcDir "src/main/java" sourceSets.test.scala.srcDir "src/test/java" + + // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting + // tasks.compileTestJava.enabled = false + sourceSets.main.java.srcDirs = [] sourceSets.test.java.srcDirs = [] dependencies { http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java deleted file mode 100644 index 69a3bee..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Data.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api.data; - -import java.util.List; -import java.util.Map; - - -/** - * A generic data interface that allows to implement data access / deserialization w/ {@link Schema} - */ -public interface Data { - - Schema schema(); - - Object value(); - - int intValue(); - - long longValue(); - - float floatValue(); - - double doubleValue(); - - boolean booleanValue(); - - String strValue(); - - byte[] bytesValue(); - - List<Object> arrayValue(); - - Map<Object, Object> mapValue(); - - Data getElement(int index); - - Data getFieldData(String fldName); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java b/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java deleted file mode 100644 index dc3f8f4..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/api/data/Schema.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.api.data; - -import java.util.Map; - - -/** - * This defines an interface for generic schema access methods - */ -public interface Schema { - - enum Type { - INTEGER, - LONG, - FLOAT, - DOUBLE, - BOOLEAN, - STRING, - BYTES, - STRUCT, - ARRAY, - MAP - }; - - Type getType(); - - Schema getElementType(); - - Schema getValueType(); - - Map<String, Schema> getFields(); - - Schema getFieldType(String fldName); - - Data read(Object object); - - Data transform(Data inputData); - - boolean equals(Schema other); -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java deleted file mode 100644 index e4f5d79..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroData.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl.data.avro; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.avro.generic.GenericArray; -import org.apache.avro.generic.GenericRecord; -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; - - -public class AvroData implements Data { - protected final Object datum; - protected final AvroSchema schema; - - private AvroData(AvroSchema schema, Object datum) { - this.datum = datum; - this.schema = schema; - } - - @Override - public Schema schema() { - return this.schema; - } - - @Override - public Object value() { - return this.datum; - } - - @Override - public int intValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public long longValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public float floatValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public double doubleValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public boolean booleanValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public String strValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public byte[] bytesValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public List<Object> arrayValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Map<Object, Object> mapValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Data getElement(int index) { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Data getFieldData(String fldName) { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - public static AvroData getArray(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.ARRAY) { - throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - @SuppressWarnings("unchecked") - private final GenericArray<Object> array = (GenericArray<Object>) this.datum; - - @Override - public List<Object> arrayValue() { - return this.array; - } - - @Override - public Data getElement(int index) { - return this.schema.getElementType().read(array.get(index)); - } - - }; - } - - public static AvroData getMap(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.MAP) { - throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - @SuppressWarnings("unchecked") - private final Map<Object, Object> map = (Map<Object, Object>) datum; - - @Override - public Map<Object, Object> mapValue() { - return this.map; - } - - @Override - public Data getFieldData(String fldName) { - // TODO Auto-generated method stub - return this.schema.getValueType().read(map.get(fldName)); - } - - }; - } - - public static AvroData getStruct(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.STRUCT) { - throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - private final GenericRecord record = (GenericRecord) datum; - - @Override - public Data getFieldData(String fldName) { - // TODO Auto-generated method stub - return this.schema.getFieldType(fldName).read(record.get(fldName)); - } - - }; - } - - public static AvroData getInt(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public int intValue() { - return ((Integer) datum).intValue(); - } - - }; - } - - public static AvroData getLong(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public long longValue() { - return ((Long) datum).longValue(); - } - - }; - } - - public static AvroData getFloat(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public float floatValue() { - return ((Float) datum).floatValue(); - } - - }; - } - - public static AvroData getDouble(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public double doubleValue() { - return ((Double) datum).doubleValue(); - } - - }; - } - - public static AvroData getBoolean(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public boolean booleanValue() { - return ((Boolean) datum).booleanValue(); - } - - }; - } - - public static AvroData getString(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public String strValue() { - return ((CharSequence) datum).toString(); - } - - }; - } - - public static AvroData getBytes(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public byte[] bytesValue() { - return ((ByteBuffer) datum).array(); - } - - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java deleted file mode 100644 index c04e4f6..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/avro/AvroSchema.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl.data.avro; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.avro.Schema.Field; -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; - - -public class AvroSchema implements Schema { - - protected final org.apache.avro.Schema avroSchema; - protected final Schema.Type type; - - private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas = - new HashMap<org.apache.avro.Schema.Type, AvroSchema>(); - - static { - primSchemas.put(org.apache.avro.Schema.Type.INT, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) { - @Override - public Data read(Object datum) { - return AvroData.getInt(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.LONG, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) { - @Override - public Data read(Object datum) { - return AvroData.getLong(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.FLOAT, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) { - @Override - public Data read(Object datum) { - return AvroData.getFloat(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.DOUBLE, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) { - @Override - public Data read(Object datum) { - return AvroData.getDouble(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) { - @Override - public Data read(Object datum) { - return AvroData.getBoolean(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.STRING, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) { - @Override - public Data read(Object datum) { - return AvroData.getString(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.BYTES, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) { - @Override - public Data read(Object datum) { - return AvroData.getBytes(this, datum); - } - }); - }; - - public static AvroSchema getSchema(final org.apache.avro.Schema schema) { - Schema.Type type = mapType(schema.getType()); - if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) { - return primSchemas.get(schema.getType()); - } - // otherwise, construct the new schema - // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects - switch (type) { - case ARRAY: - return new AvroSchema(schema) { - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.ARRAY) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - if (!input.schema().getElementType().equals(this.getElementType())) { - throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " - + input.schema().getElementType().getType()); - } - // input type matches array type - return AvroData.getArray(this, input.value()); - } - }; - case MAP: - return new AvroSchema(schema) { - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.MAP) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - if (!input.schema().getValueType().equals(this.getValueType())) { - throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " - + input.schema().getValueType().getType()); - } - // input type matches map type - return AvroData.getMap(this, input.value()); - } - }; - case STRUCT: - return new AvroSchema(schema) { - @SuppressWarnings("serial") - private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() { - { - for (Field field : schema.getFields()) { - put(field.name(), getSchema(field.schema())); - } - } - }; - - @Override - public Map<String, Schema> getFields() { - return this.fldSchemas; - } - - @Override - public Schema getFieldType(String fldName) { - return this.fldSchemas.get(fldName); - } - - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.STRUCT) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - // Note: this particular transform function only implements "projection to a sub-set" concept. - // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed - for (String fldName : this.fldSchemas.keySet()) { - // check each field schema matches input - Schema fldSchema = this.fldSchemas.get(fldName); - Schema inputFld = input.schema().getFieldType(fldName); - if (!fldSchema.equals(inputFld)) { - throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName - + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType()); - } - } - // input type matches struct type - return AvroData.getStruct(this, input.value()); - } - - }; - default: - throw new IllegalArgumentException("Un-recognized complext data type:" + type); - } - } - - private AvroSchema(org.apache.avro.Schema schema) { - this.avroSchema = schema; - this.type = mapType(schema.getType()); - } - - private static Type mapType(org.apache.avro.Schema.Type type) { - switch (type) { - case ARRAY: - return Schema.Type.ARRAY; - case RECORD: - return Schema.Type.STRUCT; - case MAP: - return Schema.Type.MAP; - case INT: - return Schema.Type.INTEGER; - case LONG: - return Schema.Type.LONG; - case BOOLEAN: - return Schema.Type.BOOLEAN; - case FLOAT: - return Schema.Type.FLOAT; - case DOUBLE: - return Schema.Type.DOUBLE; - case STRING: - return Schema.Type.STRING; - case BYTES: - return Schema.Type.BYTES; - default: - throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); - } - } - - @Override - public Type getType() { - return this.type; - } - - @Override - public Schema getElementType() { - if (this.type != Schema.Type.ARRAY) { - throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); - } - return getSchema(this.avroSchema.getElementType()); - } - - @Override - public Schema getValueType() { - if (this.type != Schema.Type.MAP) { - throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); - } - return getSchema(this.avroSchema.getValueType()); - } - - @Override - public Map<String, Schema> getFields() { - throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); - } - - @Override - public Schema getFieldType(String fldName) { - throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); - } - - @Override - public Data read(Object object) { - if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) { - return AvroData.getArray(this, object); - } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) { - return AvroData.getMap(this, object); - } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) { - return AvroData.getStruct(this, object); - } - throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported"); - } - - @Override - public Data transform(Data inputData) { - if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP - || inputData.schema().getType() == Schema.Type.STRUCT) { - throw new IllegalArgumentException("Complex schema should have overriden the default transform() function."); - } - if (inputData.schema().getType() != this.type) { - throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type - + ", input type:" + inputData.schema().getType()); - } - return inputData; - } - - @Override - public boolean equals(Schema other) { - // TODO Auto-generated method stub - if (this.type != other.getType()) { - return false; - } - switch (this.type) { - case ARRAY: - // check if element types are the same - return this.getElementType().equals(other.getElementType()); - case MAP: - // check if value types are the same - return this.getValueType().equals(other.getValueType()); - case STRUCT: - // check if the fields schemas in this equals the other - // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform() - for (String fieldName : this.getFields().keySet()) { - if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) { - return false; - } - } - return true; - default: - return true; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java deleted file mode 100644 index 2432aca..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerde.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl.data.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.samza.SamzaException; -import org.apache.samza.serializers.Serde; -import org.apache.samza.operators.impl.data.avro.AvroData; -import org.apache.samza.operators.impl.data.avro.AvroSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -public class SqlAvroSerde implements Serde<AvroData> { - private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class); - - private final Schema avroSchema; - private final GenericDatumReader<GenericRecord> reader; - private final GenericDatumWriter<Object> writer; - - public SqlAvroSerde(Schema avroSchema) { - this.avroSchema = avroSchema; - this.reader = new GenericDatumReader<GenericRecord>(avroSchema); - this.writer = new GenericDatumWriter<Object>(avroSchema); - } - - @Override - public AvroData fromBytes(byte[] bytes) { - GenericRecord data; - - try { - data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null)); - return getAvroData(data, avroSchema); - } catch (IOException e) { - String errMsg = "Cannot decode message."; - log.error(errMsg, e); - throw new SamzaException(errMsg, e); - } - } - - @Override - public byte[] toBytes(AvroData object) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Encoder encoder = new BinaryEncoder(out); - - try { - writer.write(object.value(), encoder); - encoder.flush(); - return out.toByteArray(); - } catch (IOException e) { - String errMsg = "Cannot perform Avro binary encode."; - log.error(errMsg, e); - throw new SamzaException(errMsg, e); - } - } - - private AvroData getAvroData(GenericRecord data, Schema type){ - AvroSchema schema = AvroSchema.getSchema(type); - switch (type.getType()){ - case RECORD: - return AvroData.getStruct(schema, data); - case ARRAY: - return AvroData.getArray(schema, data); - case MAP: - return AvroData.getMap(schema, data); - case INT: - return AvroData.getInt(schema, data); - case LONG: - return AvroData.getLong(schema, data); - case BOOLEAN: - return AvroData.getBoolean(schema, data); - case FLOAT: - return AvroData.getFloat(schema, data); - case DOUBLE: - return AvroData.getDouble(schema, data); - case STRING: - return AvroData.getString(schema, data); - case BYTES: - return AvroData.getBytes(schema, data); - default: - throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java deleted file mode 100644 index edd8859..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl.data.serializers; - -import org.apache.avro.Schema; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.operators.impl.data.avro.AvroData; - -public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> { - public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema"; - - @Override - public Serde<AvroData> getSerde(String name, Config config) { - String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name)); - if (avroSchemaStr == null || avroSchemaStr.isEmpty()) { - throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'."); - } - - return new SqlAvroSerde(Schema.parse(avroSchemaStr)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java deleted file mode 100644 index 1267ab6..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerde.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl.data.serializers; - -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.operators.impl.data.string.StringData; - - -public class SqlStringSerde implements Serde<StringData> { - - private final Serde<String> serde; - - public SqlStringSerde(String encoding) { - this.serde = new StringSerde(encoding); - } - - @Override - public StringData fromBytes(byte[] bytes) { - return new StringData(serde.fromBytes(bytes)); - } - - @Override - public byte[] toBytes(StringData object) { - return serde.toBytes(object.strValue()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java deleted file mode 100644 index 3b6a3e0..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/serializers/SqlStringSerdeFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl.data.serializers; - - -import org.apache.samza.config.Config; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.operators.impl.data.string.StringData; - -public class SqlStringSerdeFactory implements SerdeFactory<StringData> { - @Override - public Serde<StringData> getSerde(String name, Config config) { - return new SqlStringSerde(config.get("encoding", "UTF-8")); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java deleted file mode 100644 index 86e9917..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringData.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl.data.string; - -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; - -import java.util.List; -import java.util.Map; - -public class StringData implements Data { - private final Object datum; - private final Schema schema; - - public StringData(Object datum) { - this.datum = datum; - this.schema = new StringSchema(); - } - - @Override - public Schema schema() { - return this.schema; - } - - @Override - public Object value() { - return this.datum; - } - - @Override - public int intValue() { - throw new UnsupportedOperationException("Can't get int value for a string type data"); - } - - @Override - public long longValue() { - throw new UnsupportedOperationException("Can't get long value for a string type data"); - } - - @Override - public float floatValue() { - throw new UnsupportedOperationException("Can't get float value for a string type data"); - } - - @Override - public double doubleValue() { - throw new UnsupportedOperationException("Can't get double value for a string type data"); - } - - @Override - public boolean booleanValue() { - throw new UnsupportedOperationException("Can't get boolean value for a string type data"); - } - - @Override - public String strValue() { - return String.valueOf(datum); - } - - @Override - public byte[] bytesValue() { - throw new UnsupportedOperationException("Can't get bytesValue for a string type data"); - } - - @Override - public List<Object> arrayValue() { - throw new UnsupportedOperationException("Can't get arrayValue for a string type data"); - } - - @Override - public Map<Object, Object> mapValue() { - throw new UnsupportedOperationException("Can't get mapValue for a string type data"); - } - - @Override - public Data getElement(int index) { - throw new UnsupportedOperationException("Can't getElement(index) on a string type data"); - } - - @Override - public Data getFieldData(String fldName) { - throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java deleted file mode 100644 index b19dfeb..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/data/string/StringSchema.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.impl.data.string; - -import org.apache.samza.operators.api.data.Data; -import org.apache.samza.operators.api.data.Schema; - -import java.util.Map; - -public class StringSchema implements Schema { - private Type type = Type.STRING; - - @Override - public Type getType() { - return Type.STRING; - } - - @Override - public Schema getElementType() { - throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); - } - - @Override - public Schema getValueType() { - throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); - } - - @Override - public Map<String, Schema> getFields() { - throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); - } - - @Override - public Schema getFieldType(String fldName) { - throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); - } - - @Override - public Data read(Object object) { - return new StringData(object); - } - - @Override - public Data transform(Data inputData) { - if (inputData.schema().getType() != this.type) { - throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type - + ", input type:" + inputData.schema().getType()); - } - return inputData; - } - - @Override - public boolean equals(Schema other) { - return other.getType() == this.type; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java deleted file mode 100644 index 5aa28bb..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl.data.serializers; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.serializers.Serde; -import org.apache.samza.operators.impl.data.avro.AvroData; -import org.junit.Assert; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class SqlAvroSerdeTest { - public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.operators\",\n"+ - " \"type\": \"record\",\n"+ - " \"name\": \"Order\",\n"+ - " \"fields\": [\n"+ - " {\"name\": \"id\", \"type\": \"int\"},\n"+ - " {\"name\": \"product\", \"type\": \"string\"},\n"+ - " {\"name\": \"quantity\", \"type\": \"int\"}\n"+ - " ]\n"+ - "}"; - - public static Schema orderSchema = Schema.parse(ORDER_SCHEMA); - - private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig()); - - @Test - public void testSqlAvroSerdeDeserialization() throws IOException { - AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); - - Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT); - Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING); - } - - @Test - public void testSqlAvroSerialization() throws IOException { - AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); - @SuppressWarnings("unchecked") - byte[] encodedDatum = serde.toBytes(decodedDatumOriginal); - - AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum); - - Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRUCT); - Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.INTEGER); - Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.operators.api.data.Schema.Type.STRING); - } - - private static Config sqlAvroSerdeTestConfig(){ - Map<String, String> config = new HashMap<String, String>(); - config.put("serializers.sqlAvro.schema", ORDER_SCHEMA); - - return new MapConfig(config); - } - - private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException { - DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - BinaryEncoder encoder = new BinaryEncoder(output); - writer.write(datum, encoder); - encoder.flush(); - - return output.toByteArray(); - } - - private static GenericRecord sampleOrderRecord(){ - GenericData.Record datum = new GenericData.Record(orderSchema); - datum.put("id", 1); - datum.put("product", "paint"); - datum.put("quantity", 3); - - return datum; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java index 493a688..a6d57da 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java @@ -19,11 +19,10 @@ package org.apache.samza.task; -import org.apache.avro.generic.GenericRecord; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.Windows; import org.apache.samza.operators.TriggerBuilder; +import org.apache.samza.operators.Windows; import org.apache.samza.operators.data.IncomingSystemMessage; import org.apache.samza.operators.data.Offset; import org.apache.samza.operators.task.StreamOperatorTask; @@ -82,16 +81,7 @@ public class BroadcastOperatorTask implements StreamOperatorTask { } JsonMessage getInputMessage(IncomingSystemMessage m1) { - return new JsonMessage( - m1.getKey().toString(), - (MessageType) m1.getMessage(), - m1.getOffset(), - this.getEventTime((GenericRecord)m1.getMessage()), - m1.getSystemStreamPartition()); - } - - long getEventTime(GenericRecord msg) { - return (Long) msg.get("event_time"); + return (JsonMessage) m1.getMessage(); } boolean myFilter1(JsonMessage m1) { http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java new file mode 100644 index 0000000..7d6ee79 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Data.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data; + +import java.util.List; +import java.util.Map; + + +/** + * A generic data interface that allows to implement data access / deserialization w/ {@link Schema} + */ +public interface Data { + + Schema schema(); + + Object value(); + + int intValue(); + + long longValue(); + + float floatValue(); + + double doubleValue(); + + boolean booleanValue(); + + String strValue(); + + byte[] bytesValue(); + + List<Object> arrayValue(); + + Map<Object, Object> mapValue(); + + Data getElement(int index); + + Data getFieldData(String fldName); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java new file mode 100644 index 0000000..e2a79cf --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/Schema.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data; + +import java.util.Map; + + +/** + * This defines an interface for generic schema access methods + */ +public interface Schema { + + enum Type { + INTEGER, + LONG, + FLOAT, + DOUBLE, + BOOLEAN, + STRING, + BYTES, + STRUCT, + ARRAY, + MAP + }; + + Type getType(); + + Schema getElementType(); + + Schema getValueType(); + + Map<String, Schema> getFields(); + + Schema getFieldType(String fldName); + + Data read(Object object); + + Data transform(Data inputData); + + boolean equals(Schema other); +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java new file mode 100644 index 0000000..91d26a2 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroData.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data.avro; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; + + +public class AvroData implements Data { + protected final Object datum; + protected final AvroSchema schema; + + private AvroData(AvroSchema schema, Object datum) { + this.datum = datum; + this.schema = schema; + } + + @Override + public Schema schema() { + return this.schema; + } + + @Override + public Object value() { + return this.datum; + } + + @Override + public int intValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public long longValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public float floatValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public double doubleValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public boolean booleanValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public String strValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public byte[] bytesValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public List<Object> arrayValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Map<Object, Object> mapValue() { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Data getElement(int index) { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + @Override + public Data getFieldData(String fldName) { + throw new UnsupportedOperationException("Can't get value for an unknown data type."); + } + + public static AvroData getArray(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + @SuppressWarnings("unchecked") + private final GenericArray<Object> array = (GenericArray<Object>) this.datum; + + @Override + public List<Object> arrayValue() { + return this.array; + } + + @Override + public Data getElement(int index) { + return this.schema.getElementType().read(array.get(index)); + } + + }; + } + + public static AvroData getMap(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + @SuppressWarnings("unchecked") + private final Map<Object, Object> map = (Map<Object, Object>) datum; + + @Override + public Map<Object, Object> mapValue() { + return this.map; + } + + @Override + public Data getFieldData(String fldName) { + // TODO Auto-generated method stub + return this.schema.getValueType().read(map.get(fldName)); + } + + }; + } + + public static AvroData getStruct(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.STRUCT) { + throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType()); + } + return new AvroData(schema, datum) { + private final GenericRecord record = (GenericRecord) datum; + + @Override + public Data getFieldData(String fldName) { + // TODO Auto-generated method stub + return this.schema.getFieldType(fldName).read(record.get(fldName)); + } + + }; + } + + public static AvroData getInt(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public int intValue() { + return ((Integer) datum).intValue(); + } + + }; + } + + public static AvroData getLong(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public long longValue() { + return ((Long) datum).longValue(); + } + + }; + } + + public static AvroData getFloat(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public float floatValue() { + return ((Float) datum).floatValue(); + } + + }; + } + + public static AvroData getDouble(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public double doubleValue() { + return ((Double) datum).doubleValue(); + } + + }; + } + + public static AvroData getBoolean(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public boolean booleanValue() { + return ((Boolean) datum).booleanValue(); + } + + }; + } + + public static AvroData getString(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public String strValue() { + return ((CharSequence) datum).toString(); + } + + }; + } + + public static AvroData getBytes(AvroSchema schema, Object datum) { + if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) { + throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " + + datum.getClass().getName()); + } + return new AvroData(schema, datum) { + @Override + public byte[] bytesValue() { + return ((ByteBuffer) datum).array(); + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java new file mode 100644 index 0000000..c3bb150 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/avro/AvroSchema.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data.avro; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema.Field; +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; + + +public class AvroSchema implements Schema { + + protected final org.apache.avro.Schema avroSchema; + protected final Schema.Type type; + + private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas = + new HashMap<org.apache.avro.Schema.Type, AvroSchema>(); + + static { + primSchemas.put(org.apache.avro.Schema.Type.INT, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) { + @Override + public Data read(Object datum) { + return AvroData.getInt(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.LONG, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) { + @Override + public Data read(Object datum) { + return AvroData.getLong(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.FLOAT, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) { + @Override + public Data read(Object datum) { + return AvroData.getFloat(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.DOUBLE, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) { + @Override + public Data read(Object datum) { + return AvroData.getDouble(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) { + @Override + public Data read(Object datum) { + return AvroData.getBoolean(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.STRING, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) { + @Override + public Data read(Object datum) { + return AvroData.getString(this, datum); + } + }); + primSchemas.put(org.apache.avro.Schema.Type.BYTES, + new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) { + @Override + public Data read(Object datum) { + return AvroData.getBytes(this, datum); + } + }); + }; + + public static AvroSchema getSchema(final org.apache.avro.Schema schema) { + Schema.Type type = mapType(schema.getType()); + if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) { + return primSchemas.get(schema.getType()); + } + // otherwise, construct the new schema + // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects + switch (type) { + case ARRAY: + return new AvroSchema(schema) { + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.ARRAY) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + if (!input.schema().getElementType().equals(this.getElementType())) { + throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " + + input.schema().getElementType().getType()); + } + // input type matches array type + return AvroData.getArray(this, input.value()); + } + }; + case MAP: + return new AvroSchema(schema) { + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.MAP) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + if (!input.schema().getValueType().equals(this.getValueType())) { + throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " + + input.schema().getValueType().getType()); + } + // input type matches map type + return AvroData.getMap(this, input.value()); + } + }; + case STRUCT: + return new AvroSchema(schema) { + @SuppressWarnings("serial") + private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() { + { + for (Field field : schema.getFields()) { + put(field.name(), getSchema(field.schema())); + } + } + }; + + @Override + public Map<String, Schema> getFields() { + return this.fldSchemas; + } + + @Override + public Schema getFieldType(String fldName) { + return this.fldSchemas.get(fldName); + } + + @Override + public Data transform(Data input) { + // This would get all the elements until the length of the current schema's array length + if (input.schema().getType() != Schema.Type.STRUCT) { + throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " + + input.schema().getType()); + } + // Note: this particular transform function only implements "projection to a sub-set" concept. + // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed + for (String fldName : this.fldSchemas.keySet()) { + // check each field schema matches input + Schema fldSchema = this.fldSchemas.get(fldName); + Schema inputFld = input.schema().getFieldType(fldName); + if (!fldSchema.equals(inputFld)) { + throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName + + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType()); + } + } + // input type matches struct type + return AvroData.getStruct(this, input.value()); + } + + }; + default: + throw new IllegalArgumentException("Un-recognized complext data type:" + type); + } + } + + private AvroSchema(org.apache.avro.Schema schema) { + this.avroSchema = schema; + this.type = mapType(schema.getType()); + } + + private static Type mapType(org.apache.avro.Schema.Type type) { + switch (type) { + case ARRAY: + return Schema.Type.ARRAY; + case RECORD: + return Schema.Type.STRUCT; + case MAP: + return Schema.Type.MAP; + case INT: + return Schema.Type.INTEGER; + case LONG: + return Schema.Type.LONG; + case BOOLEAN: + return Schema.Type.BOOLEAN; + case FLOAT: + return Schema.Type.FLOAT; + case DOUBLE: + return Schema.Type.DOUBLE; + case STRING: + return Schema.Type.STRING; + case BYTES: + return Schema.Type.BYTES; + default: + throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); + } + } + + @Override + public Type getType() { + return this.type; + } + + @Override + public Schema getElementType() { + if (this.type != Schema.Type.ARRAY) { + throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); + } + return getSchema(this.avroSchema.getElementType()); + } + + @Override + public Schema getValueType() { + if (this.type != Schema.Type.MAP) { + throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); + } + return getSchema(this.avroSchema.getValueType()); + } + + @Override + public Map<String, Schema> getFields() { + throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); + } + + @Override + public Schema getFieldType(String fldName) { + throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); + } + + @Override + public Data read(Object object) { + if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) { + return AvroData.getArray(this, object); + } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) { + return AvroData.getMap(this, object); + } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) { + return AvroData.getStruct(this, object); + } + throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported"); + } + + @Override + public Data transform(Data inputData) { + if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP + || inputData.schema().getType() == Schema.Type.STRUCT) { + throw new IllegalArgumentException("Complex schema should have overriden the default transform() function."); + } + if (inputData.schema().getType() != this.type) { + throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type + + ", input type:" + inputData.schema().getType()); + } + return inputData; + } + + @Override + public boolean equals(Schema other) { + // TODO Auto-generated method stub + if (this.type != other.getType()) { + return false; + } + switch (this.type) { + case ARRAY: + // check if element types are the same + return this.getElementType().equals(other.getElementType()); + case MAP: + // check if value types are the same + return this.getValueType().equals(other.getValueType()); + case STRUCT: + // check if the fields schemas in this equals the other + // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform() + for (String fieldName : this.getFields().keySet()) { + if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) { + return false; + } + } + return true; + default: + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java new file mode 100644 index 0000000..97a3b6c --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerde.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.calcite.data.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.samza.SamzaException; +import org.apache.samza.serializers.Serde; +import org.apache.samza.sql.calcite.data.avro.AvroData; +import org.apache.samza.sql.calcite.data.avro.AvroSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class SqlAvroSerde implements Serde<AvroData> { + private static Logger log = LoggerFactory.getLogger(SqlAvroSerde.class); + + private final Schema avroSchema; + private final GenericDatumReader<GenericRecord> reader; + private final GenericDatumWriter<Object> writer; + + public SqlAvroSerde(Schema avroSchema) { + this.avroSchema = avroSchema; + this.reader = new GenericDatumReader<GenericRecord>(avroSchema); + this.writer = new GenericDatumWriter<Object>(avroSchema); + } + + @Override + public AvroData fromBytes(byte[] bytes) { + GenericRecord data; + + try { + data = reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(bytes, null)); + return getAvroData(data, avroSchema); + } catch (IOException e) { + String errMsg = "Cannot decode message."; + log.error(errMsg, e); + throw new SamzaException(errMsg, e); + } + } + + @Override + public byte[] toBytes(AvroData object) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = new BinaryEncoder(out); + + try { + writer.write(object.value(), encoder); + encoder.flush(); + return out.toByteArray(); + } catch (IOException e) { + String errMsg = "Cannot perform Avro binary encode."; + log.error(errMsg, e); + throw new SamzaException(errMsg, e); + } + } + + private AvroData getAvroData(GenericRecord data, Schema type){ + AvroSchema schema = AvroSchema.getSchema(type); + switch (type.getType()){ + case RECORD: + return AvroData.getStruct(schema, data); + case ARRAY: + return AvroData.getArray(schema, data); + case MAP: + return AvroData.getMap(schema, data); + case INT: + return AvroData.getInt(schema, data); + case LONG: + return AvroData.getLong(schema, data); + case BOOLEAN: + return AvroData.getBoolean(schema, data); + case FLOAT: + return AvroData.getFloat(schema, data); + case DOUBLE: + return AvroData.getDouble(schema, data); + case STRING: + return AvroData.getString(schema, data); + case BYTES: + return AvroData.getBytes(schema, data); + default: + throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java new file mode 100644 index 0000000..caf4009 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.calcite.data.serializers; + +import org.apache.avro.Schema; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.sql.calcite.data.avro.AvroData; + +public class SqlAvroSerdeFactory implements SerdeFactory<AvroData> { + public static final String PROP_AVRO_SCHEMA = "serializers.%s.schema"; + + @Override + public Serde<AvroData> getSerde(String name, Config config) { + String avroSchemaStr = config.get(String.format(PROP_AVRO_SCHEMA, name)); + if (avroSchemaStr == null || avroSchemaStr.isEmpty()) { + throw new SamzaException("Cannot find avro schema for SerdeFactory '" + name + "'."); + } + + return new SqlAvroSerde(Schema.parse(avroSchemaStr)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java new file mode 100644 index 0000000..6651e97 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerde.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data.serializers; + +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.sql.calcite.data.string.StringData; + + +public class SqlStringSerde implements Serde<StringData> { + + private final Serde<String> serde; + + public SqlStringSerde(String encoding) { + this.serde = new StringSerde(encoding); + } + + @Override + public StringData fromBytes(byte[] bytes) { + return new StringData(serde.fromBytes(bytes)); + } + + @Override + public byte[] toBytes(StringData object) { + return serde.toBytes(object.strValue()); + } +}
