Repository: samza Updated Branches: refs/heads/master 8090d6539 -> 6743df319
Revert "SAMZA-484; define serialization for tuples in samza-sql" This reverts commit eedf2e7204fc01e32bd21c454ff83a36a23f6105. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f44a6929 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f44a6929 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f44a6929 Branch: refs/heads/master Commit: f44a692914b3005cd85acd64a52be6415ae49c59 Parents: 8090d65 Author: Chris Riccomini <[email protected]> Authored: Thu Feb 12 14:10:33 2015 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Feb 12 14:10:33 2015 -0800 ---------------------------------------------------------------------- build.gradle | 10 +- gradle/dependency-versions.gradle | 1 - samza-sql/README | 1 + samza-sql/README.md | 1 - .../org/apache/samza/sql/api/data/Data.java | 54 ---- .../org/apache/samza/sql/api/data/Schema.java | 55 ---- .../org/apache/samza/sql/api/data/Tuple.java | 4 +- .../samza/sql/data/IncomingMessageTuple.java | 9 +- .../apache/samza/sql/data/avro/AvroData.java | 262 ---------------- .../apache/samza/sql/data/avro/AvroSchema.java | 296 ------------------- .../sql/data/serializers/SqlStringSerde.java | 45 --- .../data/serializers/SqlStringSerdeFactory.java | 33 --- .../samza/sql/data/string/StringData.java | 101 ------- .../samza/sql/data/string/StringSchema.java | 73 ----- .../sql/operators/partition/PartitionOp.java | 5 +- 15 files changed, 18 insertions(+), 932 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index b49c313..e6b10fc 100644 --- a/build.gradle +++ b/build.gradle @@ -78,7 +78,7 @@ rat { 'gradlew', 'gradlew.bat', 'samza-test/state/mystore/**', - '**/README.md', + 'README.md', 'RELEASE.md', ] } @@ -249,12 +249,18 @@ project(":samza-yarn_$scalaVersion") { project(":samza-sql_$scalaVersion") { apply plugin: 'java' + configurations { + // Remove transitive dependencies from Zookeeper that we don't want. + compile.exclude group: 'javax.jms', module: 'jms' + compile.exclude group: 'com.sun.jdmk', module: 'jmxtools' + compile.exclude group: 'com.sun.jmx', module: 'jmxri' + } + dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") compile project(":samza-kv_$scalaVersion") compile "commons-collections:commons-collections:$commonsCollectionVersion" - compile "org.apache.avro:avro:$avroVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 03c72f8..6f815b2 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -35,5 +35,4 @@ guavaVersion = "17.0" commonsCodecVersion = "1.9" commonsCollectionVersion = "3.2.1" - avroVersion = "1.7.7" } http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/README ---------------------------------------------------------------------- diff --git a/samza-sql/README b/samza-sql/README new file mode 100644 index 0000000..65b7558 --- /dev/null +++ b/samza-sql/README @@ -0,0 +1 @@ +samza-sql is an experimental module that is under development (SAMZA-390). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/README.md ---------------------------------------------------------------------- diff --git a/samza-sql/README.md b/samza-sql/README.md deleted file mode 100644 index 598670b..0000000 --- a/samza-sql/README.md +++ /dev/null @@ -1 +0,0 @@ -samza-sql is an experimental module that is under development (SAMZA-390). http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java deleted file mode 100644 index d1b8409..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import java.util.List; -import java.util.Map; - - -public interface Data { - - Schema schema(); - - Object value(); - - int intValue(); - - long longValue(); - - float floatValue(); - - double doubleValue(); - - boolean booleanValue(); - - String strValue(); - - byte[] bytesValue(); - - List<Object> arrayValue(); - - Map<Object, Object> mapValue(); - - Data getElement(int index); - - Data getFieldData(String fldName); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java deleted file mode 100644 index 1e8f192..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.api.data; - -import java.util.Map; - - -public interface Schema { - - enum Type { - INTEGER, - LONG, - FLOAT, - DOUBLE, - BOOLEAN, - STRING, - BYTES, - STRUCT, - ARRAY, - MAP - }; - - Type getType(); - - Schema getElementType(); - - Schema getValueType(); - - Map<String, Schema> getFields(); - - Schema getFieldType(String fldName); - - Data read(Object object); - - Data transform(Data inputData); - - boolean equals(Schema other); -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java index bc8efcf..0c21a53 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java @@ -32,7 +32,7 @@ public interface Tuple { * * @return Message object in the tuple */ - Data getMessage(); + Object getMessage(); /** * Method to indicate whether the tuple is a delete tuple or an insert tuple @@ -46,7 +46,7 @@ public interface Tuple { * * @return The <code>key</code> of the tuple */ - Data getKey(); + Object getKey(); /** * Get the stream name of the tuple. Note this stream name should be unique in the system. http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java index f868e5c..a8a55e2 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java @@ -18,7 +18,6 @@ */ package org.apache.samza.sql.data; -import org.apache.samza.sql.api.data.Data; import org.apache.samza.sql.api.data.EntityName; import org.apache.samza.sql.api.data.Tuple; import org.apache.samza.system.IncomingMessageEnvelope; @@ -53,8 +52,8 @@ public class IncomingMessageTuple implements Tuple { // TODO: the return type should be changed to the generic data type @Override - public Data getMessage() { - return (Data) this.imsg.getMessage(); + public Object getMessage() { + return this.imsg.getMessage(); } @Override @@ -63,8 +62,8 @@ public class IncomingMessageTuple implements Tuple { } @Override - public Data getKey() { - return (Data) this.imsg.getKey(); + public Object getKey() { + return imsg.getKey(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java deleted file mode 100644 index d040be9..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.avro; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.avro.generic.GenericArray; -import org.apache.avro.generic.GenericRecord; -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - - -public class AvroData implements Data { - protected final Object datum; - protected final AvroSchema schema; - - private AvroData(AvroSchema schema, Object datum) { - this.datum = datum; - this.schema = schema; - } - - @Override - public Schema schema() { - return this.schema; - } - - @Override - public Object value() { - return this.datum; - } - - @Override - public int intValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public long longValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public float floatValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public double doubleValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public boolean booleanValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public String strValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public byte[] bytesValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public List<Object> arrayValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Map<Object, Object> mapValue() { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Data getElement(int index) { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - @Override - public Data getFieldData(String fldName) { - throw new UnsupportedOperationException("Can't get value for an unknown data type."); - } - - public static AvroData getArray(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.ARRAY) { - throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - @SuppressWarnings("unchecked") - private final GenericArray<Object> array = (GenericArray<Object>) this.datum; - - @Override - public List<Object> arrayValue() { - return this.array; - } - - @Override - public Data getElement(int index) { - return this.schema.getElementType().read(array.get(index)); - } - - }; - } - - public static AvroData getMap(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.MAP) { - throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - @SuppressWarnings("unchecked") - private final Map<Object, Object> map = (Map<Object, Object>) datum; - - @Override - public Map<Object, Object> mapValue() { - return this.map; - } - - @Override - public Data getFieldData(String fldName) { - // TODO Auto-generated method stub - return this.schema.getValueType().read(map.get(fldName)); - } - - }; - } - - public static AvroData getStruct(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.STRUCT) { - throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType()); - } - return new AvroData(schema, datum) { - private final GenericRecord record = (GenericRecord) datum; - - @Override - public Data getFieldData(String fldName) { - // TODO Auto-generated method stub - return this.schema.getFieldType(fldName).read(record.get(fldName)); - } - - }; - } - - public static AvroData getInt(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public int intValue() { - return ((Integer) datum).intValue(); - } - - }; - } - - public static AvroData getLong(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public long longValue() { - return ((Long) datum).longValue(); - } - - }; - } - - public static AvroData getFloat(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public float floatValue() { - return ((Float) datum).floatValue(); - } - - }; - } - - public static AvroData getDouble(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public double doubleValue() { - return ((Double) datum).doubleValue(); - } - - }; - } - - public static AvroData getBoolean(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public boolean booleanValue() { - return ((Boolean) datum).booleanValue(); - } - - }; - } - - public static AvroData getString(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public String strValue() { - return ((CharSequence) datum).toString(); - } - - }; - } - - public static AvroData getBytes(AvroSchema schema, Object datum) { - if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) { - throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: " - + datum.getClass().getName()); - } - return new AvroData(schema, datum) { - @Override - public byte[] bytesValue() { - return ((ByteBuffer) datum).array(); - } - - }; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java deleted file mode 100644 index 577cf74..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.avro; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.avro.Schema.Field; -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - - -public class AvroSchema implements Schema { - - protected final org.apache.avro.Schema avroSchema; - protected final Schema.Type type; - - private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas = - new HashMap<org.apache.avro.Schema.Type, AvroSchema>(); - - static { - primSchemas.put(org.apache.avro.Schema.Type.INT, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) { - @Override - public Data read(Object datum) { - return AvroData.getInt(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.LONG, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) { - @Override - public Data read(Object datum) { - return AvroData.getLong(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.FLOAT, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) { - @Override - public Data read(Object datum) { - return AvroData.getFloat(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.DOUBLE, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) { - @Override - public Data read(Object datum) { - return AvroData.getDouble(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) { - @Override - public Data read(Object datum) { - return AvroData.getBoolean(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.STRING, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) { - @Override - public Data read(Object datum) { - return AvroData.getString(this, datum); - } - }); - primSchemas.put(org.apache.avro.Schema.Type.BYTES, - new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) { - @Override - public Data read(Object datum) { - return AvroData.getBytes(this, datum); - } - }); - }; - - public static AvroSchema getSchema(final org.apache.avro.Schema schema) { - Schema.Type type = mapType(schema.getType()); - if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) { - return primSchemas.get(schema.getType()); - } - // otherwise, construct the new schema - // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects - switch (type) { - case ARRAY: - return new AvroSchema(schema) { - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.ARRAY) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - if (!input.schema().getElementType().equals(this.getElementType())) { - throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " - + input.schema().getElementType().getType()); - } - // input type matches array type - return AvroData.getArray(this, input.value()); - } - }; - case MAP: - return new AvroSchema(schema) { - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.MAP) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - if (!input.schema().getValueType().equals(this.getValueType())) { - throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: " - + input.schema().getValueType().getType()); - } - // input type matches map type - return AvroData.getMap(this, input.value()); - } - }; - case STRUCT: - return new AvroSchema(schema) { - @SuppressWarnings("serial") - private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() { - { - for (Field field : schema.getFields()) { - put(field.name(), getSchema(field.schema())); - } - } - }; - - @Override - public Map<String, Schema> getFields() { - return this.fldSchemas; - } - - @Override - public Schema getFieldType(String fldName) { - return this.fldSchemas.get(fldName); - } - - @Override - public Data transform(Data input) { - // This would get all the elements until the length of the current schema's array length - if (input.schema().getType() != Schema.Type.STRUCT) { - throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: " - + input.schema().getType()); - } - // Note: this particular transform function only implements "projection to a sub-set" concept. - // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed - for (String fldName : this.fldSchemas.keySet()) { - // check each field schema matches input - Schema fldSchema = this.fldSchemas.get(fldName); - Schema inputFld = input.schema().getFieldType(fldName); - if (!fldSchema.equals(inputFld)) { - throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName - + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType()); - } - } - // input type matches struct type - return AvroData.getStruct(this, input.value()); - } - - }; - default: - throw new IllegalArgumentException("Un-recognized complext data type:" + type); - } - } - - private AvroSchema(org.apache.avro.Schema schema) { - this.avroSchema = schema; - this.type = mapType(schema.getType()); - } - - private static Type mapType(org.apache.avro.Schema.Type type) { - switch (type) { - case ARRAY: - return Schema.Type.ARRAY; - case RECORD: - return Schema.Type.STRUCT; - case MAP: - return Schema.Type.MAP; - case INT: - return Schema.Type.INTEGER; - case LONG: - return Schema.Type.LONG; - case BOOLEAN: - return Schema.Type.BOOLEAN; - case FLOAT: - return Schema.Type.FLOAT; - case DOUBLE: - return Schema.Type.DOUBLE; - case STRING: - return Schema.Type.STRING; - case BYTES: - return Schema.Type.BYTES; - default: - throw new IllegalArgumentException("Avro schema: " + type + " is not supported"); - } - } - - @Override - public Type getType() { - return this.type; - } - - @Override - public Schema getElementType() { - if (this.type != Schema.Type.ARRAY) { - throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); - } - return getSchema(this.avroSchema.getElementType()); - } - - @Override - public Schema getValueType() { - if (this.type != Schema.Type.MAP) { - throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); - } - return getSchema(this.avroSchema.getValueType()); - } - - @Override - public Map<String, Schema> getFields() { - throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); - } - - @Override - public Schema getFieldType(String fldName) { - throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); - } - - @Override - public Data read(Object object) { - if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) { - return AvroData.getArray(this, object); - } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) { - return AvroData.getMap(this, object); - } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) { - return AvroData.getStruct(this, object); - } - throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported"); - } - - @Override - public Data transform(Data inputData) { - if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP - || inputData.schema().getType() == Schema.Type.STRUCT) { - throw new IllegalArgumentException("Complex schema should have overriden the default transform() function."); - } - if (inputData.schema().getType() != this.type) { - throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type - + ", input type:" + inputData.schema().getType()); - } - return inputData; - } - - @Override - public boolean equals(Schema other) { - // TODO Auto-generated method stub - if (this.type != other.getType()) { - return false; - } - switch (this.type) { - case ARRAY: - // check if element types are the same - return this.getElementType().equals(other.getElementType()); - case MAP: - // check if value types are the same - return this.getValueType().equals(other.getValueType()); - case STRUCT: - // check if the fields schemas in this equals the other - // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform() - for (String fieldName : this.getFields().keySet()) { - if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) { - return false; - } - } - return true; - default: - return true; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java deleted file mode 100644 index 1f0c3b2..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.serializers; - -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.sql.data.string.StringData; - -import java.io.UnsupportedEncodingException; - -public class SqlStringSerde implements Serde<StringData> { - - private final Serde<String> serde; - - public SqlStringSerde(String encoding) { - this.serde = new StringSerde(encoding); - } - - @Override - public StringData fromBytes(byte[] bytes) { - return new StringData(serde.fromBytes(bytes)); - } - - @Override - public byte[] toBytes(StringData object) { - return serde.toBytes(object.strValue()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java deleted file mode 100644 index 2564479..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.serializers; - - -import org.apache.samza.config.Config; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerdeFactory; -import org.apache.samza.sql.data.string.StringData; - -public class SqlStringSerdeFactory implements SerdeFactory<StringData> { - @Override - public Serde<StringData> getSerde(String name, Config config) { - return new SqlStringSerde(config.get("encoding", "UTF-8")); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java deleted file mode 100644 index b81d9fa..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.string; - -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - -import java.util.List; -import java.util.Map; - -public class StringData implements Data { - private final Object datum; - private final Schema schema; - - public StringData(Object datum) { - this.datum = datum; - this.schema = new StringSchema(); - } - - @Override - public Schema schema() { - return this.schema; - } - - @Override - public Object value() { - return this.datum; - } - - @Override - public int intValue() { - throw new UnsupportedOperationException("Can't get int value for a string type data"); - } - - @Override - public long longValue() { - throw new UnsupportedOperationException("Can't get long value for a string type data"); - } - - @Override - public float floatValue() { - throw new UnsupportedOperationException("Can't get float value for a string type data"); - } - - @Override - public double doubleValue() { - throw new UnsupportedOperationException("Can't get double value for a string type data"); - } - - @Override - public boolean booleanValue() { - throw new UnsupportedOperationException("Can't get boolean value for a string type data"); - } - - @Override - public String strValue() { - return String.valueOf(datum); - } - - @Override - public byte[] bytesValue() { - throw new UnsupportedOperationException("Can't get bytesValue for a string type data"); - } - - @Override - public List<Object> arrayValue() { - throw new UnsupportedOperationException("Can't get arrayValue for a string type data"); - } - - @Override - public Map<Object, Object> mapValue() { - throw new UnsupportedOperationException("Can't get mapValue for a string type data"); - } - - @Override - public Data getElement(int index) { - throw new UnsupportedOperationException("Can't getElement(index) on a string type data"); - } - - @Override - public Data getFieldData(String fldName) { - throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java deleted file mode 100644 index 348fc0c..0000000 --- a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.sql.data.string; - -import org.apache.samza.sql.api.data.Data; -import org.apache.samza.sql.api.data.Schema; - -import java.util.Map; - -public class StringSchema implements Schema { - private Type type = Type.STRING; - - @Override - public Type getType() { - return Type.STRING; - } - - @Override - public Schema getElementType() { - throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); - } - - @Override - public Schema getValueType() { - throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); - } - - @Override - public Map<String, Schema> getFields() { - throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); - } - - @Override - public Schema getFieldType(String fldName) { - throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); - } - - @Override - public Data read(Object object) { - return new StringData(object); - } - - @Override - public Data transform(Data inputData) { - if (inputData.schema().getType() != this.type) { - throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type - + ", input type:" + inputData.schema().getType()); - } - return inputData; - } - - @Override - public boolean equals(Schema other) { - return other.getType() == this.type; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java index 986d688..7921d4f 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java @@ -82,8 +82,9 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator { @Override public void process(Tuple tuple, SqlMessageCollector collector) throws Exception { - collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(), - tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value())); + collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(), + null /* TODO: when merge with Schema API changes, use: tuple + .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage())); } }
