Repository: samza
Updated Branches:
  refs/heads/master 8090d6539 -> 6743df319


Revert "SAMZA-484; define serialization for tuples in samza-sql"

This reverts commit eedf2e7204fc01e32bd21c454ff83a36a23f6105.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f44a6929
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f44a6929
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f44a6929

Branch: refs/heads/master
Commit: f44a692914b3005cd85acd64a52be6415ae49c59
Parents: 8090d65
Author: Chris Riccomini <[email protected]>
Authored: Thu Feb 12 14:10:33 2015 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Feb 12 14:10:33 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  10 +-
 gradle/dependency-versions.gradle               |   1 -
 samza-sql/README                                |   1 +
 samza-sql/README.md                             |   1 -
 .../org/apache/samza/sql/api/data/Data.java     |  54 ----
 .../org/apache/samza/sql/api/data/Schema.java   |  55 ----
 .../org/apache/samza/sql/api/data/Tuple.java    |   4 +-
 .../samza/sql/data/IncomingMessageTuple.java    |   9 +-
 .../apache/samza/sql/data/avro/AvroData.java    | 262 ----------------
 .../apache/samza/sql/data/avro/AvroSchema.java  | 296 -------------------
 .../sql/data/serializers/SqlStringSerde.java    |  45 ---
 .../data/serializers/SqlStringSerdeFactory.java |  33 ---
 .../samza/sql/data/string/StringData.java       | 101 -------
 .../samza/sql/data/string/StringSchema.java     |  73 -----
 .../sql/operators/partition/PartitionOp.java    |   5 +-
 15 files changed, 18 insertions(+), 932 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b49c313..e6b10fc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -78,7 +78,7 @@ rat {
     'gradlew',
     'gradlew.bat',
     'samza-test/state/mystore/**',
-    '**/README.md',
+    'README.md',
     'RELEASE.md',
   ]
 }
@@ -249,12 +249,18 @@ project(":samza-yarn_$scalaVersion") {
 project(":samza-sql_$scalaVersion") {
   apply plugin: 'java'
 
+  configurations {
+    // Remove transitive dependencies from Zookeeper that we don't want.
+    compile.exclude group: 'javax.jms', module: 'jms'
+    compile.exclude group: 'com.sun.jdmk', module: 'jmxtools'
+    compile.exclude group: 'com.sun.jmx', module: 'jmxri'
+  }
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile project(":samza-kv_$scalaVersion")
     compile "commons-collections:commons-collections:$commonsCollectionVersion"
-    compile "org.apache.avro:avro:$avroVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 03c72f8..6f815b2 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -35,5 +35,4 @@
   guavaVersion = "17.0"
   commonsCodecVersion = "1.9"
   commonsCollectionVersion = "3.2.1"
-  avroVersion = "1.7.7"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/README
----------------------------------------------------------------------
diff --git a/samza-sql/README b/samza-sql/README
new file mode 100644
index 0000000..65b7558
--- /dev/null
+++ b/samza-sql/README
@@ -0,0 +1 @@
+samza-sql is an experimental module that is under development (SAMZA-390).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/README.md
----------------------------------------------------------------------
diff --git a/samza-sql/README.md b/samza-sql/README.md
deleted file mode 100644
index 598670b..0000000
--- a/samza-sql/README.md
+++ /dev/null
@@ -1 +0,0 @@
-samza-sql is an experimental module that is under development (SAMZA-390).

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
deleted file mode 100644
index d1b8409..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.List;
-import java.util.Map;
-
-
-public interface Data {
-
-  Schema schema();
-
-  Object value();
-
-  int intValue();
-
-  long longValue();
-
-  float floatValue();
-
-  double doubleValue();
-
-  boolean booleanValue();
-
-  String strValue();
-
-  byte[] bytesValue();
-
-  List<Object> arrayValue();
-
-  Map<Object, Object> mapValue();
-
-  Data getElement(int index);
-
-  Data getFieldData(String fldName);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
deleted file mode 100644
index 1e8f192..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.api.data;
-
-import java.util.Map;
-
-
-public interface Schema {
-
-  enum Type {
-    INTEGER,
-    LONG,
-    FLOAT,
-    DOUBLE,
-    BOOLEAN,
-    STRING,
-    BYTES,
-    STRUCT,
-    ARRAY,
-    MAP
-  };
-
-  Type getType();
-
-  Schema getElementType();
-
-  Schema getValueType();
-
-  Map<String, Schema> getFields();
-
-  Schema getFieldType(String fldName);
-
-  Data read(Object object);
-
-  Data transform(Data inputData);
-
-  boolean equals(Schema other);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
index bc8efcf..0c21a53 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -32,7 +32,7 @@ public interface Tuple {
    *
    * @return Message object in the tuple
    */
-  Data getMessage();
+  Object getMessage();
 
   /**
    * Method to indicate whether the tuple is a delete tuple or an insert tuple
@@ -46,7 +46,7 @@ public interface Tuple {
    *
    * @return The <code>key</code> of the tuple
    */
-  Data getKey();
+  Object getKey();
 
   /**
    * Get the stream name of the tuple. Note this stream name should be unique 
in the system.

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index f868e5c..a8a55e2 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.sql.data;
 
-import org.apache.samza.sql.api.data.Data;
 import org.apache.samza.sql.api.data.EntityName;
 import org.apache.samza.sql.api.data.Tuple;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -53,8 +52,8 @@ public class IncomingMessageTuple implements Tuple {
 
   // TODO: the return type should be changed to the generic data type
   @Override
-  public Data getMessage() {
-    return (Data) this.imsg.getMessage();
+  public Object getMessage() {
+    return this.imsg.getMessage();
   }
 
   @Override
@@ -63,8 +62,8 @@ public class IncomingMessageTuple implements Tuple {
   }
 
   @Override
-  public Data getKey() {
-    return (Data) this.imsg.getKey();
+  public Object getKey() {
+    return imsg.getKey();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
deleted file mode 100644
index d040be9..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.avro;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-
-public class AvroData implements Data {
-  protected final Object datum;
-  protected final AvroSchema schema;
-
-  private AvroData(AvroSchema schema, Object datum) {
-    this.datum = datum;
-    this.schema = schema;
-  }
-
-  @Override
-  public Schema schema() {
-    return this.schema;
-  }
-
-  @Override
-  public Object value() {
-    return this.datum;
-  }
-
-  @Override
-  public int intValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public long longValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public float floatValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public double doubleValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public boolean booleanValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public String strValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public byte[] bytesValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public List<Object> arrayValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public Map<Object, Object> mapValue() {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public Data getElement(int index) {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  @Override
-  public Data getFieldData(String fldName) {
-    throw new UnsupportedOperationException("Can't get value for an unknown 
data type.");
-  }
-
-  public static AvroData getArray(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.ARRAY) {
-      throw new IllegalArgumentException("Can't create an array object with 
non-array schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final GenericArray<Object> array = (GenericArray<Object>) 
this.datum;
-
-      @Override
-      public List<Object> arrayValue() {
-        return this.array;
-      }
-
-      @Override
-      public Data getElement(int index) {
-        return this.schema.getElementType().read(array.get(index));
-      }
-
-    };
-  }
-
-  public static AvroData getMap(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.MAP) {
-      throw new IllegalArgumentException("Can't create a map object with 
non-map schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      @SuppressWarnings("unchecked")
-      private final Map<Object, Object> map = (Map<Object, Object>) datum;
-
-      @Override
-      public Map<Object, Object> mapValue() {
-        return this.map;
-      }
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getValueType().read(map.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getStruct(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Can't create a struct object with 
non-struct schema:" + schema.getType());
-    }
-    return new AvroData(schema, datum) {
-      private final GenericRecord record = (GenericRecord) datum;
-
-      @Override
-      public Data getFieldData(String fldName) {
-        // TODO Auto-generated method stub
-        return this.schema.getFieldType(fldName).read(record.get(fldName));
-      }
-
-    };
-  }
-
-  public static AvroData getInt(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof 
Integer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public int intValue() {
-        return ((Integer) datum).intValue();
-      }
-
-    };
-  }
-
-  public static AvroData getLong(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public long longValue() {
-        return ((Long) datum).longValue();
-      }
-
-    };
-  }
-
-  public static AvroData getFloat(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public float floatValue() {
-        return ((Float) datum).floatValue();
-      }
-
-    };
-  }
-
-  public static AvroData getDouble(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public double doubleValue() {
-        return ((Double) datum).doubleValue();
-      }
-
-    };
-  }
-
-  public static AvroData getBoolean(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof 
Boolean)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public boolean booleanValue() {
-        return ((Boolean) datum).booleanValue();
-      }
-
-    };
-  }
-
-  public static AvroData getString(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.STRING || !(datum instanceof 
CharSequence)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public String strValue() {
-        return ((CharSequence) datum).toString();
-      }
-
-    };
-  }
-
-  public static AvroData getBytes(AvroSchema schema, Object datum) {
-    if (schema.getType() != Schema.Type.BYTES || !(datum instanceof 
ByteBuffer)) {
-      throw new IllegalArgumentException("data object and schema mismatch. 
schema:" + schema.getType() + ", data: "
-          + datum.getClass().getName());
-    }
-    return new AvroData(schema, datum) {
-      @Override
-      public byte[] bytesValue() {
-        return ((ByteBuffer) datum).array();
-      }
-
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
deleted file mode 100644
index 577cf74..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.avro;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.avro.Schema.Field;
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-
-public class AvroSchema implements Schema {
-
-  protected final org.apache.avro.Schema avroSchema;
-  protected final Schema.Type type;
-
-  private final static Map<org.apache.avro.Schema.Type, AvroSchema> 
primSchemas =
-      new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
-
-  static {
-    primSchemas.put(org.apache.avro.Schema.Type.INT,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getInt(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.LONG,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getLong(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getFloat(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getDouble(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBoolean(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.STRING,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getString(this, datum);
-      }
-    });
-    primSchemas.put(org.apache.avro.Schema.Type.BYTES,
-        new 
AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
-      @Override
-      public Data read(Object datum) {
-        return AvroData.getBytes(this, datum);
-      }
-    });
-  };
-
-  public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
-    Schema.Type type = mapType(schema.getType());
-    if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != 
Schema.Type.STRUCT) {
-      return primSchemas.get(schema.getType());
-    }
-    // otherwise, construct the new schema
-    // TODO: It would be possible to assign each complex schema an ID and 
cache it w/o repeated create in-memory schema objects
-    switch (type) {
-      case ARRAY:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current 
schema's array length
-            if (input.schema().getType() != Schema.Type.ARRAY) {
-              throw new IllegalArgumentException("Schema mismatch. Can't 
transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if 
(!input.schema().getElementType().equals(this.getElementType())) {
-              throw new IllegalArgumentException("Element schema mismatch. 
Can't transfer data. input schema: "
-                  + input.schema().getElementType().getType());
-            }
-            // input type matches array type
-            return AvroData.getArray(this, input.value());
-          }
-        };
-      case MAP:
-        return new AvroSchema(schema) {
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current 
schema's array length
-            if (input.schema().getType() != Schema.Type.MAP) {
-              throw new IllegalArgumentException("Schema mismatch. Can't 
transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            if (!input.schema().getValueType().equals(this.getValueType())) {
-              throw new IllegalArgumentException("Element schema mismatch. 
Can't transfer data. input schema: "
-                  + input.schema().getValueType().getType());
-            }
-            // input type matches map type
-            return AvroData.getMap(this, input.value());
-          }
-        };
-      case STRUCT:
-        return new AvroSchema(schema) {
-          @SuppressWarnings("serial")
-          private final Map<String, Schema> fldSchemas = new HashMap<String, 
Schema>() {
-            {
-              for (Field field : schema.getFields()) {
-                put(field.name(), getSchema(field.schema()));
-              }
-            }
-          };
-
-          @Override
-          public Map<String, Schema> getFields() {
-            return this.fldSchemas;
-          }
-
-          @Override
-          public Schema getFieldType(String fldName) {
-            return this.fldSchemas.get(fldName);
-          }
-
-          @Override
-          public Data transform(Data input) {
-            // This would get all the elements until the length of the current 
schema's array length
-            if (input.schema().getType() != Schema.Type.STRUCT) {
-              throw new IllegalArgumentException("Schema mismatch. Can't 
transfer data. input schema: "
-                  + input.schema().getType());
-            }
-            // Note: this particular transform function only implements 
"projection to a sub-set" concept.
-            // More complex function is needed if some other concepts such as 
"merge from two sets of data", "allow null if does not exist" are needed
-            for (String fldName : this.fldSchemas.keySet()) {
-              // check each field schema matches input
-              Schema fldSchema = this.fldSchemas.get(fldName);
-              Schema inputFld = input.schema().getFieldType(fldName);
-              if (!fldSchema.equals(inputFld)) {
-                throw new IllegalArgumentException("Field schema mismatch. 
Can't transfer data for field " + fldName
-                    + ". input field schema:" + inputFld.getType() + ", this 
field schema: " + fldSchema.getType());
-              }
-            }
-            // input type matches struct type
-            return AvroData.getStruct(this, input.value());
-          }
-
-        };
-      default:
-        throw new IllegalArgumentException("Un-recognized complext data type:" 
+ type);
-    }
-  }
-
-  private AvroSchema(org.apache.avro.Schema schema) {
-    this.avroSchema = schema;
-    this.type = mapType(schema.getType());
-  }
-
-  private static Type mapType(org.apache.avro.Schema.Type type) {
-    switch (type) {
-      case ARRAY:
-        return Schema.Type.ARRAY;
-      case RECORD:
-        return Schema.Type.STRUCT;
-      case MAP:
-        return Schema.Type.MAP;
-      case INT:
-        return Schema.Type.INTEGER;
-      case LONG:
-        return Schema.Type.LONG;
-      case BOOLEAN:
-        return Schema.Type.BOOLEAN;
-      case FLOAT:
-        return Schema.Type.FLOAT;
-      case DOUBLE:
-        return Schema.Type.DOUBLE;
-      case STRING:
-        return Schema.Type.STRING;
-      case BYTES:
-        return Schema.Type.BYTES;
-      default:
-        throw new IllegalArgumentException("Avro schema: " + type + " is not 
supported");
-    }
-  }
-
-  @Override
-  public Type getType() {
-    return this.type;
-  }
-
-  @Override
-  public Schema getElementType() {
-    if (this.type != Schema.Type.ARRAY) {
-      throw new UnsupportedOperationException("Can't getElmentType with 
non-array schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getElementType());
-  }
-
-  @Override
-  public Schema getValueType() {
-    if (this.type != Schema.Type.MAP) {
-      throw new UnsupportedOperationException("Can't getValueType with non-map 
schema: " + this.type);
-    }
-    return getSchema(this.avroSchema.getValueType());
-  }
-
-  @Override
-  public Map<String, Schema> getFields() {
-    throw new UnsupportedOperationException("Can't get field types with 
unknown schema type:" + this.type);
-  }
-
-  @Override
-  public Schema getFieldType(String fldName) {
-    throw new UnsupportedOperationException("Can't getFieldType with 
non-map/non-struct schema: " + this.type);
-  }
-
-  @Override
-  public Data read(Object object) {
-    if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
-      return AvroData.getArray(this, object);
-    } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
-      return AvroData.getMap(this, object);
-    } else if (this.avroSchema.getType() == 
org.apache.avro.Schema.Type.RECORD) {
-      return AvroData.getStruct(this, object);
-    }
-    throw new UnsupportedOperationException("Reading unknown complext type:" + 
this.type + " is not supported");
-  }
-
-  @Override
-  public Data transform(Data inputData) {
-    if (inputData.schema().getType() == Schema.Type.ARRAY || 
inputData.schema().getType() == Schema.Type.MAP
-        || inputData.schema().getType() == Schema.Type.STRUCT) {
-      throw new IllegalArgumentException("Complex schema should have overriden 
the default transform() function.");
-    }
-    if (inputData.schema().getType() != this.type) {
-      throw new IllegalArgumentException("Can't transform a mismatched 
primitive type. this type:" + this.type
-          + ", input type:" + inputData.schema().getType());
-    }
-    return inputData;
-  }
-
-  @Override
-  public boolean equals(Schema other) {
-    // TODO Auto-generated method stub
-    if (this.type != other.getType()) {
-      return false;
-    }
-    switch (this.type) {
-      case ARRAY:
-        // check if element types are the same
-        return this.getElementType().equals(other.getElementType());
-      case MAP:
-        // check if value types are the same
-        return this.getValueType().equals(other.getValueType());
-      case STRUCT:
-        // check if the fields schemas in this equals the other
-        // NOTE: this equals check is in consistent with the "projection to 
subset" concept implemented in transform()
-        for (String fieldName : this.getFields().keySet()) {
-          if 
(!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
-            return false;
-          }
-        }
-        return true;
-      default:
-        return true;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
deleted file mode 100644
index 1f0c3b2..0000000
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.serializers;
-
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.sql.data.string.StringData;
-
-import java.io.UnsupportedEncodingException;
-
-public class SqlStringSerde implements Serde<StringData> {
-
-    private final Serde<String> serde;
-
-    public SqlStringSerde(String encoding) {
-        this.serde = new StringSerde(encoding);
-    }
-
-    @Override
-    public StringData fromBytes(byte[] bytes) {
-          return new StringData(serde.fromBytes(bytes));
-    }
-
-    @Override
-    public byte[] toBytes(StringData object) {
-        return serde.toBytes(object.strValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
deleted file mode 100644
index 2564479..0000000
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.serializers;
-
-
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.sql.data.string.StringData;
-
-public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
-    @Override
-    public Serde<StringData> getSerde(String name, Config config) {
-        return new SqlStringSerde(config.get("encoding", "UTF-8"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
deleted file mode 100644
index b81d9fa..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.string;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-import java.util.List;
-import java.util.Map;
-
-public class StringData implements Data {
-    private final Object datum;
-    private final Schema schema;
-
-    public StringData(Object datum) {
-        this.datum = datum;
-        this.schema = new StringSchema();
-    }
-
-    @Override
-    public Schema schema() {
-        return this.schema;
-    }
-
-    @Override
-    public Object value() {
-        return this.datum;
-    }
-
-    @Override
-    public int intValue() {
-        throw new UnsupportedOperationException("Can't get int value for a 
string type data");
-    }
-
-    @Override
-    public long longValue() {
-        throw new UnsupportedOperationException("Can't get long value for a 
string type data");
-    }
-
-    @Override
-    public float floatValue() {
-        throw new UnsupportedOperationException("Can't get float value for a 
string type data");
-    }
-
-    @Override
-    public double doubleValue() {
-        throw new UnsupportedOperationException("Can't get double value for a 
string type data");
-    }
-
-    @Override
-    public boolean booleanValue() {
-        throw new UnsupportedOperationException("Can't get boolean value for a 
string type data");
-    }
-
-    @Override
-    public String strValue() {
-        return String.valueOf(datum);
-    }
-
-    @Override
-    public byte[] bytesValue() {
-        throw new UnsupportedOperationException("Can't get bytesValue for a 
string type data");
-    }
-
-    @Override
-    public List<Object> arrayValue() {
-        throw new UnsupportedOperationException("Can't get arrayValue for a 
string type data");
-    }
-
-    @Override
-    public Map<Object, Object> mapValue() {
-        throw new UnsupportedOperationException("Can't get mapValue for a 
string type data");
-    }
-
-    @Override
-    public Data getElement(int index) {
-        throw new UnsupportedOperationException("Can't getElement(index) on a 
string type data");
-    }
-
-    @Override
-    public Data getFieldData(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldData(fieldName) 
for a string type data");
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java 
b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
deleted file mode 100644
index 348fc0c..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.sql.data.string;
-
-import org.apache.samza.sql.api.data.Data;
-import org.apache.samza.sql.api.data.Schema;
-
-import java.util.Map;
-
-public class StringSchema implements Schema {
-    private Type type = Type.STRING;
-
-    @Override
-    public Type getType() {
-      return Type.STRING;
-    }
-
-    @Override
-    public Schema getElementType() {
-      throw new UnsupportedOperationException("Can't getElmentType with 
non-array schema: " + this.type);
-    }
-
-    @Override
-    public Schema getValueType() {
-        throw new UnsupportedOperationException("Can't getValueType with 
non-map schema: " + this.type);
-    }
-
-    @Override
-    public Map<String, Schema> getFields() {
-        throw new UnsupportedOperationException("Can't get field types with 
unknown schema type:" + this.type);
-    }
-
-    @Override
-    public Schema getFieldType(String fldName) {
-        throw new UnsupportedOperationException("Can't getFieldType with 
non-map/non-struct schema: " + this.type);
-    }
-
-    @Override
-    public Data read(Object object) {
-        return new StringData(object);
-    }
-
-    @Override
-    public Data transform(Data inputData) {
-        if (inputData.schema().getType() != this.type) {
-            throw new IllegalArgumentException("Can't transform a mismatched 
primitive type. this type:" + this.type
-                    + ", input type:" + inputData.schema().getType());
-        }
-        return inputData;
-    }
-
-    @Override
-    public boolean equals(Schema other) {
-        return other.getType() == this.type;
-    }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/f44a6929/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 986d688..7921d4f 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -82,8 +82,9 @@ public final class PartitionOp extends SimpleOperator 
implements TupleOperator {
 
   @Override
   public void process(Tuple tuple, SqlMessageCollector collector) throws 
Exception {
-    collector.send(new 
OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), 
tuple.getKey().value(),
-        
tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), 
tuple.getMessage().value()));
+    collector.send(new 
OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(),
+        null /* TODO: when merge with Schema API changes, use: tuple
+             .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, 
tuple.getMessage()));
   }
 
 }

Reply via email to