Repository: samza
Updated Branches:
  refs/heads/samza-sql 1dac25e17 -> a7de73594


http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
 
b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
new file mode 100644
index 0000000..80fb542
--- /dev/null
+++ 
b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/serializers/SqlStringSerdeFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.serializers;
+
+
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.calcite.data.string.StringData;
+
+public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
+    @Override
+    public Serde<StringData> getSerde(String name, Config config) {
+        return new SqlStringSerde(config.get("encoding", "UTF-8"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
 
b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
new file mode 100644
index 0000000..f7c8121
--- /dev/null
+++ 
b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringData.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.string;
+
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class StringData implements Data {
+    private final Object datum;
+    private final Schema schema;
+
+    public StringData(Object datum) {
+        this.datum = datum;
+        this.schema = new StringSchema();
+    }
+
+    @Override
+    public Schema schema() {
+        return this.schema;
+    }
+
+    @Override
+    public Object value() {
+        return this.datum;
+    }
+
+    @Override
+    public int intValue() {
+        throw new UnsupportedOperationException("Can't get int value for a 
string type data");
+    }
+
+    @Override
+    public long longValue() {
+        throw new UnsupportedOperationException("Can't get long value for a 
string type data");
+    }
+
+    @Override
+    public float floatValue() {
+        throw new UnsupportedOperationException("Can't get float value for a 
string type data");
+    }
+
+    @Override
+    public double doubleValue() {
+        throw new UnsupportedOperationException("Can't get double value for a 
string type data");
+    }
+
+    @Override
+    public boolean booleanValue() {
+        throw new UnsupportedOperationException("Can't get boolean value for a 
string type data");
+    }
+
+    @Override
+    public String strValue() {
+        return String.valueOf(datum);
+    }
+
+    @Override
+    public byte[] bytesValue() {
+        throw new UnsupportedOperationException("Can't get bytesValue for a 
string type data");
+    }
+
+    @Override
+    public List<Object> arrayValue() {
+        throw new UnsupportedOperationException("Can't get arrayValue for a 
string type data");
+    }
+
+    @Override
+    public Map<Object, Object> mapValue() {
+        throw new UnsupportedOperationException("Can't get mapValue for a 
string type data");
+    }
+
+    @Override
+    public Data getElement(int index) {
+        throw new UnsupportedOperationException("Can't getElement(index) on a 
string type data");
+    }
+
+    @Override
+    public Data getFieldData(String fldName) {
+        throw new UnsupportedOperationException("Can't getFieldData(fieldName) 
for a string type data");
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
 
b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
new file mode 100644
index 0000000..829d61f
--- /dev/null
+++ 
b/samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/data/string/StringSchema.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.calcite.data.string;
+
+import org.apache.samza.sql.calcite.data.Data;
+import org.apache.samza.sql.calcite.data.Schema;
+
+import java.util.Map;
+
+public class StringSchema implements Schema {
+    private Type type = Type.STRING;
+
+    @Override
+    public Type getType() {
+      return Type.STRING;
+    }
+
+    @Override
+    public Schema getElementType() {
+      throw new UnsupportedOperationException("Can't getElmentType with 
non-array schema: " + this.type);
+    }
+
+    @Override
+    public Schema getValueType() {
+        throw new UnsupportedOperationException("Can't getValueType with 
non-map schema: " + this.type);
+    }
+
+    @Override
+    public Map<String, Schema> getFields() {
+        throw new UnsupportedOperationException("Can't get field types with 
unknown schema type:" + this.type);
+    }
+
+    @Override
+    public Schema getFieldType(String fldName) {
+        throw new UnsupportedOperationException("Can't getFieldType with 
non-map/non-struct schema: " + this.type);
+    }
+
+    @Override
+    public Data read(Object object) {
+        return new StringData(object);
+    }
+
+    @Override
+    public Data transform(Data inputData) {
+        if (inputData.schema().getType() != this.type) {
+            throw new IllegalArgumentException("Can't transform a mismatched 
primitive type. this type:" + this.type
+                    + ", input type:" + inputData.schema().getType());
+        }
+        return inputData;
+    }
+
+    @Override
+    public boolean equals(Schema other) {
+        return other.getType() == this.type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7de7359/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
 
b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
new file mode 100644
index 0000000..b891ddc
--- /dev/null
+++ 
b/samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/data/serializers/SqlAvroSerdeTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql.calcite.data.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.sql.calcite.data.avro.AvroData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SqlAvroSerdeTest {
+  public static final String ORDER_SCHEMA = "{\"namespace\": 
\"org.apache.samza.operators\",\n"+
+      " \"type\": \"record\",\n"+
+      " \"name\": \"Order\",\n"+
+      " \"fields\": [\n"+
+      "     {\"name\": \"id\", \"type\": \"int\"},\n"+
+      "     {\"name\": \"product\",  \"type\": \"string\"},\n"+
+      "     {\"name\": \"quantity\", \"type\": \"int\"}\n"+
+      " ]\n"+
+      "}";
+
+  public static Schema orderSchema = Schema.parse(ORDER_SCHEMA);
+
+  private static Serde serde = new SqlAvroSerdeFactory().getSerde("sqlAvro", 
sqlAvroSerdeTestConfig());
+
+  @Test
+  public void testSqlAvroSerdeDeserialization() throws IOException {
+    AvroData decodedDatum = 
(AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+
+    Assert.assertTrue(decodedDatum.schema().getType() == 
org.apache.samza.sql.calcite.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == 
org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() 
== org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() 
== org.apache.samza.sql.calcite.data.Schema.Type.STRING);
+  }
+
+  @Test
+  public void testSqlAvroSerialization() throws IOException {
+    AvroData decodedDatumOriginal = 
(AvroData)serde.fromBytes(encodeMessage(sampleOrderRecord(), orderSchema));
+    @SuppressWarnings("unchecked")
+    byte[] encodedDatum = serde.toBytes(decodedDatumOriginal);
+
+    AvroData decodedDatum = (AvroData)serde.fromBytes(encodedDatum);
+
+    Assert.assertTrue(decodedDatum.schema().getType() == 
org.apache.samza.sql.calcite.data.Schema.Type.STRUCT);
+    Assert.assertTrue(decodedDatum.getFieldData("id").schema().getType() == 
org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("quantity").schema().getType() 
== org.apache.samza.sql.calcite.data.Schema.Type.INTEGER);
+    Assert.assertTrue(decodedDatum.getFieldData("product").schema().getType() 
== org.apache.samza.sql.calcite.data.Schema.Type.STRING);
+  }
+
+  private static Config sqlAvroSerdeTestConfig(){
+    Map<String, String> config = new HashMap<String, String>();
+    config.put("serializers.sqlAvro.schema", ORDER_SCHEMA);
+
+    return new MapConfig(config);
+  }
+
+  private static byte[] encodeMessage(GenericRecord datum, Schema avroSchema) 
throws IOException {
+    DatumWriter<GenericRecord> writer = new 
GenericDatumWriter<GenericRecord>(avroSchema);
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    BinaryEncoder encoder = new BinaryEncoder(output);
+    writer.write(datum, encoder);
+    encoder.flush();
+
+    return  output.toByteArray();
+  }
+
+  private static GenericRecord sampleOrderRecord(){
+    GenericData.Record datum = new GenericData.Record(orderSchema);
+    datum.put("id", 1);
+    datum.put("product", "paint");
+    datum.put("quantity", 3);
+
+    return datum;
+  }
+}

Reply via email to