Repository: flink Updated Branches: refs/heads/master b08669abf -> 48791c347
Revert "[FLINK-2203] handling null values for RowSerializer" This reverts commit f8e12b20d925c3f6f24769327d1da5d98affa679. The commit had to be reverted because the RowSerializer is not in sync with other comperators and serializers. See FLINK-2236. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48791c34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48791c34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48791c34 Branch: refs/heads/master Commit: 48791c34776fe10373ef3abbc35d9b0fcfbda1e4 Parents: ff0a1a0 Author: Maximilian Michels <[email protected]> Authored: Tue Oct 6 11:25:56 2015 +0200 Committer: Maximilian Michels <[email protected]> Committed: Tue Oct 6 17:16:55 2015 +0200 ---------------------------------------------------------------------- flink-staging/flink-table/pom.xml | 8 --- .../api/table/typeinfo/RowSerializer.scala | 67 ++++++------------- .../api/table/typeinfo/RowSerializerTest.scala | 70 -------------------- 3 files changed, 19 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml index 1a622aa..358e116 100644 --- a/flink-staging/flink-table/pom.xml +++ b/flink-staging/flink-table/pom.xml @@ -94,14 +94,6 @@ under the License. <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala index 02219c7..5e9613d 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala @@ -17,13 +17,9 @@ */ package org.apache.flink.api.table.typeinfo -import java.util - -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.common.typeutils.base.BooleanSerializer import org.apache.flink.api.table.Row -import org.apache.flink.core.memory.{DataInputView, DataOutputView} - +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.core.memory.{DataOutputView, DataInputView} /** * Serializer for [[Row]]. @@ -31,8 +27,6 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView} class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) extends TypeSerializer[Row] { - private def getFieldSerializers = fieldSerializers - override def isImmutableType: Boolean = false override def getLength: Int = -1 @@ -79,17 +73,11 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) override def serialize(value: Row, target: DataOutputView) { val len = fieldSerializers.length - var index = 0 - while (index < len) { - val o: AnyRef = value.productElement(index).asInstanceOf[AnyRef] - if (o == null) { - target.writeBoolean(true) - } else { - target.writeBoolean(false) - val serializer = fieldSerializers(index) - serializer.serialize(value.productElement(index), target) - } - index += 1 + var i = 0 + while (i < len) { + val serializer = fieldSerializers(i) + serializer.serialize(value.productElement(i), target) + i += 1 } } @@ -100,17 +88,11 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) throw new RuntimeException("Row arity of reuse and fields do not match.") } - var index = 0 - while (index < len) { - val isNull: Boolean = source.readBoolean - if (isNull) { - reuse.setField(index, null) - } else { - val field = reuse.productElement(index).asInstanceOf[AnyRef] - val serializer: TypeSerializer[Any] = fieldSerializers(index) - reuse.setField(index, serializer.deserialize(field, source)) - } - index += 1 + var i = 0 + while (i < len) { + val field = reuse.productElement(i).asInstanceOf[AnyRef] + reuse.setField(i, fieldSerializers(i).deserialize(field, source)) + i += 1 } reuse } @@ -119,17 +101,10 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) val len = fieldSerializers.length val result = new Row(len) - - var index = 0 - while (index < len) { - val isNull: Boolean = source.readBoolean() - if (isNull) { - result.setField(index, null) - } else { - val serializer: TypeSerializer[Any] = fieldSerializers(index) - result.setField(index, serializer.deserialize(source)) - } - index += 1 + var i = 0 + while (i < len) { + result.setField(i, fieldSerializers(i).deserialize(source)) + i += 1 } result } @@ -138,11 +113,7 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) val len = fieldSerializers.length var i = 0 while (i < len) { - val isNull = source.readBoolean() - target.writeBoolean(isNull) - if (!isNull) { - fieldSerializers(i).copy(source, target) - } + fieldSerializers(i).copy(source, target) i += 1 } } @@ -151,7 +122,7 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) any match { case otherRS: RowSerializer => otherRS.canEqual(this) && - fieldSerializers.sameElements(otherRS.fieldSerializers) + fieldSerializers.sameElements(otherRS.fieldSerializers) case _ => false } } @@ -161,6 +132,6 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) } override def hashCode(): Int = { - util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]]) + java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala deleted file mode 100644 index cff276a..0000000 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer} -import org.apache.flink.api.table.Row -import org.junit.Assert._ -import org.junit.Test - -class RowSerializerTest { - - class RowSerializerTestInstance(serializer: TypeSerializer[Row], - testData: Array[Row]) - extends SerializerTestInstance(serializer, classOf[Row], -1, testData: _*) { - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - } - - @Test - def testRowSerializer(): Unit ={ - - val rowInfo: TypeInformation[Row] = new RowTypeInfo( - Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name")) - - val row1 = new Row(2) - row1.setField(0, 1) - row1.setField(1, "a") - - val row2 = new Row(2) - row2.setField(0, 2) - row2.setField(1, null) - - val testData: Array[Row] = Array(row1, row2) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer,testData) - - testInstance.testAll() - } - -}
