Repository: flink
Updated Branches:
  refs/heads/master b08669abf -> 48791c347


Revert "[FLINK-2203] handling null values for RowSerializer"

This reverts commit f8e12b20d925c3f6f24769327d1da5d98affa679.

The commit had to be reverted because the RowSerializer is not in sync
with other comperators and serializers. See FLINK-2236.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48791c34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48791c34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48791c34

Branch: refs/heads/master
Commit: 48791c34776fe10373ef3abbc35d9b0fcfbda1e4
Parents: ff0a1a0
Author: Maximilian Michels <[email protected]>
Authored: Tue Oct 6 11:25:56 2015 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Tue Oct 6 17:16:55 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-table/pom.xml               |  8 ---
 .../api/table/typeinfo/RowSerializer.scala      | 67 ++++++-------------
 .../api/table/typeinfo/RowSerializerTest.scala  | 70 --------------------
 3 files changed, 19 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml 
b/flink-staging/flink-table/pom.xml
index 1a622aa..358e116 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,14 +94,6 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 02219c7..5e9613d 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -17,13 +17,9 @@
  */
 package org.apache.flink.api.table.typeinfo
 
-import java.util
-
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer
 import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 
 /**
  * Serializer for [[Row]].
@@ -31,8 +27,6 @@ import org.apache.flink.core.memory.{DataInputView, 
DataOutputView}
 class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
   extends TypeSerializer[Row] {
 
-  private def getFieldSerializers = fieldSerializers
-
   override def isImmutableType: Boolean = false
 
   override def getLength: Int = -1
@@ -79,17 +73,11 @@ class RowSerializer(val fieldSerializers: 
Array[TypeSerializer[Any]])
 
   override def serialize(value: Row, target: DataOutputView) {
     val len = fieldSerializers.length
-    var index = 0
-    while (index < len) {
-      val o: AnyRef = value.productElement(index).asInstanceOf[AnyRef]
-      if (o == null) {
-        target.writeBoolean(true)
-      } else {
-        target.writeBoolean(false)
-        val serializer = fieldSerializers(index)
-        serializer.serialize(value.productElement(index), target)
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      val serializer = fieldSerializers(i)
+      serializer.serialize(value.productElement(i), target)
+      i += 1
     }
   }
 
@@ -100,17 +88,11 @@ class RowSerializer(val fieldSerializers: 
Array[TypeSerializer[Any]])
       throw new RuntimeException("Row arity of reuse and fields do not match.")
     }
 
-    var index = 0
-    while (index < len) {
-      val isNull: Boolean = source.readBoolean
-      if (isNull) {
-        reuse.setField(index, null)
-      } else {
-        val field = reuse.productElement(index).asInstanceOf[AnyRef]
-        val serializer: TypeSerializer[Any] = fieldSerializers(index)
-        reuse.setField(index, serializer.deserialize(field, source))
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+      i += 1
     }
     reuse
   }
@@ -119,17 +101,10 @@ class RowSerializer(val fieldSerializers: 
Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
 
     val result = new Row(len)
-
-    var index = 0
-    while (index < len) {
-      val isNull: Boolean = source.readBoolean()
-      if (isNull) {
-        result.setField(index, null)
-      } else {
-        val serializer: TypeSerializer[Any] = fieldSerializers(index)
-        result.setField(index, serializer.deserialize(source))
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      result.setField(i, fieldSerializers(i).deserialize(source))
+      i += 1
     }
     result
   }
@@ -138,11 +113,7 @@ class RowSerializer(val fieldSerializers: 
Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
     var i = 0
     while (i < len) {
-      val isNull = source.readBoolean()
-      target.writeBoolean(isNull)
-      if (!isNull) {
-        fieldSerializers(i).copy(source, target)
-      }
+      fieldSerializers(i).copy(source, target)
       i += 1
     }
   }
@@ -151,7 +122,7 @@ class RowSerializer(val fieldSerializers: 
Array[TypeSerializer[Any]])
     any match {
       case otherRS: RowSerializer =>
         otherRS.canEqual(this) &&
-        fieldSerializers.sameElements(otherRS.fieldSerializers)
+          fieldSerializers.sameElements(otherRS.fieldSerializers)
       case _ => false
     }
   }
@@ -161,6 +132,6 @@ class RowSerializer(val fieldSerializers: 
Array[TypeSerializer[Any]])
   }
 
   override def hashCode(): Int = {
-    util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
+    java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
deleted file mode 100644
index cff276a..0000000
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{SerializerTestInstance, 
TypeSerializer}
-import org.apache.flink.api.table.Row
-import org.junit.Assert._
-import org.junit.Test
-
-class RowSerializerTest {
-
-  class RowSerializerTestInstance(serializer: TypeSerializer[Row],
-                                  testData: Array[Row])
-    extends SerializerTestInstance(serializer, classOf[Row], -1, testData: _*) 
{
-
-    override protected def deepEquals(message: String, should: Row, is: Row): 
Unit = {
-      val arity = should.productArity
-      assertEquals(message, arity, is.productArity)
-      var index = 0
-      while (index < arity) {
-        val copiedValue: Any = should.productElement(index)
-        val element: Any = is.productElement(index)
-        assertEquals(message, element, copiedValue)
-        index += 1
-      }
-    }
-  }
-
-  @Test
-  def testRowSerializer(): Unit ={
-
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), 
Seq("id", "name"))
-
-    val row1 = new Row(2)
-    row1.setField(0, 1)
-    row1.setField(1, "a")
-
-    val row2 = new Row(2)
-    row2.setField(0, 2)
-    row2.setField(1, null)
-
-    val testData: Array[Row] = Array(row1, row2)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer,testData)
-
-    testInstance.testAll()
-  }
-
-}

Reply via email to