Repository: samza Updated Branches: refs/heads/samza-sql 1dac25e17 -> a7de73594
http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java new file mode 100644 index 0000000..80fb542 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data.serializers; + + +import org.apache.samza.config.Config; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerdeFactory; +import org.apache.samza.sql.calcite.data.string.StringData; + +public class SqlStringSerdeFactory implements SerdeFactory<StringData> { + @Override + public Serde<StringData> getSerde(String name, Config config) { + return new SqlStringSerde(config.get("encoding", "UTF-8")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java new file mode 100644 index 0000000..f7c8121 --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data.string; + +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; + +import java.util.List; +import java.util.Map; + +public class StringData implements Data { + private final Object datum; + private final Schema schema; + + public StringData(Object datum) { + this.datum = datum; + this.schema = new StringSchema(); + } + + @Override + public Schema schema() { + return this.schema; + } + + @Override + public Object value() { + return this.datum; + } + + @Override + public int intValue() { + throw new UnsupportedOperationException("Can't get int value for a string type data"); + } + + @Override + public long longValue() { + throw new UnsupportedOperationException("Can't get long value for a string type data"); + } + + @Override + public float floatValue() { + throw new UnsupportedOperationException("Can't get float value for a string type data"); + } + + @Override + public double doubleValue() { + throw new UnsupportedOperationException("Can't get double value for a string type data"); + } + + @Override + public boolean booleanValue() { + throw new UnsupportedOperationException("Can't get boolean value for a string type data"); + } + + @Override + public String strValue() { + return String.valueOf(datum); + } + + @Override + public byte[] bytesValue() { + throw new UnsupportedOperationException("Can't get bytesValue for a string type data"); + } + + @Override + public List<Object> arrayValue() { + throw new UnsupportedOperationException("Can't get arrayValue for a string type data"); + } + + @Override + public Map<Object, Object> mapValue() { + throw new UnsupportedOperationException("Can't get mapValue for a string type data"); + } + + @Override + public Data getElement(int index) { + throw new UnsupportedOperationException("Can't getElement(index) on a string type data"); + } + + @Override + public Data getFieldData(String fldName) { + throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java new file mode 100644 index 0000000..829d61f --- /dev/null +++ b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.sql.calcite.data.string; + +import org.apache.samza.sql.calcite.data.Data; +import org.apache.samza.sql.calcite.data.Schema; + +import java.util.Map; + +public class StringSchema implements Schema { + private Type type = Type.STRING; + + @Override + public Type getType() { + return Type.STRING; + } + + @Override + public Schema getElementType() { + throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type); + } + + @Override + public Schema getValueType() { + throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type); + } + + @Override + public Map<String, Schema> getFields() { + throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type); + } + + @Override + public Schema getFieldType(String fldName) { + throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type); + } + + @Override + public Data read(Object object) { + return new StringData(object); + } + + @Override + public Data transform(Data inputData) { + if (inputData.schema().getType() != this.type) { + throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type + + ", input type:" + inputData.schema().getType()); + } + return inputData; + } + + @Override + public boolean equals(Schema other) { + return other.getType() == this.type; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java ---------------------------------------------------------------------- diff --git a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java new file mode 100644 index 0000000..b891ddc --- /dev/null +++ b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.calcite.data.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.serializers.Serde; +import org.apache.samza.sql.calcite.data.avro.AvroData; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class SqlAvroSerdeTest { + public static final String ORDER_SCHEMA = "{\"namespace\": \"org.apache.samza.operators\",\n"+ + " \"type\": \"record\",\n"+ + " \"name\": \"Order\",\n"+ + " \"fields\": [\n"+ + " {\"name\": \"id\", \"type\": \"int\"},\n"+ + " {\"name\": \"product\", \"type\": \"string\"},\n"+ + " {\"name\": \"quantity\", \"type\": \"int\"}\n"+ + " ]\n"+ + "}"; + + public static Schema orderSchema = Schema.parse(ORDER_SCHEMA); + + private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", sqlAvroSerdeTestConfig()); + + @Test + public void testSqlAvroSerdeDeserialization() throws IOException { + AvroData decodedDatum = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); + + Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT); + Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING); + } + + @Test + public void testSqlAvroSerialization() throws IOException { + AvroData decodedDatumOriginal = (AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema)); + @SuppressWarnings("unchecked") + byte[] encodedDatum = serde.toBytes(decodedDatumOriginal); + + AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum); + + Assert.assertTrue(decodedDatum.schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRUCT); + Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.INTEGER); + Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() == org.apache.samza.sql.calcite.data.Schema.Type.STRING); + } + + private static Config sqlAvroSerdeTestConfig(){ + Map<String, String> config = new HashMap<String, String>(); + config.put("serializers.sqlAvro.schema", ORDER_SCHEMA); + + return new MapConfig(config); + } + + private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) throws IOException { + DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + BinaryEncoder encoder = new BinaryEncoder(output); + writer.write(datum, encoder); + encoder.flush(); + + return output.toByteArray(); + } + + private static GenericRecord sampleOrderRecord(){ + GenericData.Record datum = new GenericData.Record(orderSchema); + datum.put("id", 1); + datum.put("product", "paint"); + datum.put("quantity", 3); + + return datum; + } +}
