[FLINK-5189] [table] Delete Row and its related classes from flink-table. This closes #3004.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9e6ec86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9e6ec86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9e6ec86 Branch: refs/heads/master Commit: a9e6ec863a0879b0c8dea199535380a6d42a3121 Parents: 86f8a25 Author: tonycox <[email protected]> Authored: Wed Dec 14 12:41:51 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Thu Dec 15 11:36:40 2016 +0100 ---------------------------------------------------------------------- .../scala/org/apache/flink/api/table/Row.scala | 38 - .../table/runtime/io/RowCsvInputFormat.scala | 181 ---- .../table/typeutils/NullAwareComparator.scala | 218 ----- .../api/table/typeutils/NullMaskUtils.scala | 98 --- .../api/table/typeutils/RowComparator.scala | 425 --------- .../api/table/typeutils/RowSerializer.scala | 209 ----- .../flink/api/table/typeutils/RowTypeInfo.scala | 108 --- .../runtime/io/RowCsvInputFormatTest.scala | 882 ------------------- .../api/table/typeutils/RowComparatorTest.scala | 136 --- .../RowComparatorWithManyFieldsTest.scala | 82 -- .../api/table/typeutils/RowSerializerTest.scala | 194 ---- 11 files changed, 2571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala deleted file mode 100644 index e3baab3..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * This is used for executing Table API operations. We use manually generated - * TypeInfo to check the field types and create serializers and comparators. - */ -class Row(arity: Int) extends Product { - - private val fields = new Array[Any](arity) - - def productArity = fields.length - - def productElement(i: Int): Any = fields(i) - - def setField(i: Int, value: Any): Unit = fields(i) = value - - def canEqual(that: Any) = false - - override def toString = fields.mkString(",") - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala deleted file mode 100644 index b0ab801..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime.io - -import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.io.ParseException -import org.apache.flink.api.java.io.CsvInputFormat -import org.apache.flink.api.java.io.CsvInputFormat.{DEFAULT_FIELD_DELIMITER, DEFAULT_LINE_DELIMITER, createDefaultMask, toBooleanMask} -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.runtime.io.RowCsvInputFormat.extractTypeClasses -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.core.fs.Path -import org.apache.flink.types.parser.FieldParser -import org.apache.flink.types.parser.FieldParser.ParseErrorState - -@Internal -@SerialVersionUID(1L) -class RowCsvInputFormat( - filePath: Path, - rowTypeInfo: RowTypeInfo, - lineDelimiter: String = DEFAULT_LINE_DELIMITER, - fieldDelimiter: String = DEFAULT_FIELD_DELIMITER, - includedFieldsMask: Array[Boolean] = null, - emptyColumnAsNull: Boolean = false) - extends CsvInputFormat[Row](filePath) { - - if (rowTypeInfo.getArity == 0) { - throw new IllegalArgumentException("Row arity must be greater than 0.") - } - private val arity = rowTypeInfo.getArity - private lazy val defaultFieldMask = createDefaultMask(arity) - private val fieldsMask = Option(includedFieldsMask).getOrElse(defaultFieldMask) - - // prepare CsvInputFormat - setDelimiter(lineDelimiter) - setFieldDelimiter(fieldDelimiter) - setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo)) - - def this( - filePath: Path, - rowTypeInfo: RowTypeInfo, - lineDelimiter: String, - fieldDelimiter: String, - includedFieldsMask: Array[Int]) { - this( - filePath, - rowTypeInfo, - lineDelimiter, - fieldDelimiter, - if (includedFieldsMask == null) { - null - } else { - toBooleanMask(includedFieldsMask) - }) - } - - def this( - filePath: Path, - rowTypeInfo: RowTypeInfo, - includedFieldsMask: Array[Int]) { - this( - filePath, - rowTypeInfo, - DEFAULT_LINE_DELIMITER, - DEFAULT_FIELD_DELIMITER, - includedFieldsMask) - } - - def fillRecord(reuse: Row, parsedValues: Array[AnyRef]): Row = { - val reuseRow = if (reuse == null) { - new Row(arity) - } else { - reuse - } - var i: Int = 0 - while (i < parsedValues.length) { - reuse.setField(i, parsedValues(i)) - i += 1 - } - reuseRow - } - - @throws[ParseException] - override protected def parseRecord( - holders: Array[AnyRef], - bytes: Array[Byte], - offset: Int, - numBytes: Int) - : Boolean = { - val fieldDelimiter = this.getFieldDelimiter - val fieldIncluded: Array[Boolean] = this.fieldIncluded - - var startPos = offset - val limit = offset + numBytes - - var field = 0 - var output = 0 - while (field < fieldIncluded.length) { - - // check valid start position - if (startPos >= limit) { - if (isLenient) { - return false - } else { - throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)) - } - } - - if (fieldIncluded(field)) { - // parse field - val parser: FieldParser[AnyRef] = this.getFieldParsers()(output) - .asInstanceOf[FieldParser[AnyRef]] - val latestValidPos = startPos - startPos = parser.resetErrorStateAndParse( - bytes, - startPos, - limit, - fieldDelimiter, - holders(output)) - - if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) { - // the error state EMPTY_COLUMN is ignored - if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) { - throw new ParseException(s"Parsing error for column $field of row '" - + new String(bytes, offset, numBytes) - + s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.") - } - } - holders(output) = parser.getLastResult - - // check parse result: - // the result is null if it is invalid - // or empty with emptyColumnAsNull enabled - if (startPos < 0 || - (emptyColumnAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_COLUMN))) { - holders(output) = null - startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter) - } - output += 1 - } else { - // skip field - startPos = skipFields(bytes, startPos, limit, fieldDelimiter) - } - - // check if something went wrong - if (startPos < 0) { - throw new ParseException(s"Unexpected parser position for column $field of row '" - + new String(bytes, offset, numBytes) + "'") - } - - field += 1 - } - true - } -} - -object RowCsvInputFormat { - - private def extractTypeClasses(rowTypeInfo: RowTypeInfo): Array[Class[_]] = { - val classes = for (i <- 0 until rowTypeInfo.getArity) - yield rowTypeInfo.getTypeAt(i).getTypeClass - classes.toArray - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala deleted file mode 100644 index 86a768d..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator} -import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment} - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer - -/** - * Null-aware comparator that wraps a comparator which does not support null references. - * - * NOTE: This class assumes to be used within a composite type comparator (such - * as [[RowComparator]]) that handles serialized comparison. - */ -class NullAwareComparator[T]( - val wrappedComparator: TypeComparator[T], - val order: Boolean) - extends TypeComparator[T] { - - // number of flat fields - private val flatFields = wrappedComparator.getFlatComparators.length - - // stores the null for reference comparison - private var nullReference = false - - override def hash(record: T): Int = { - if (record != null) { - wrappedComparator.hash(record) - } - else { - 0 - } - } - - override def getNormalizeKeyLen: Int = { - val len = wrappedComparator.getNormalizeKeyLen - if (len == Integer.MAX_VALUE) { - Integer.MAX_VALUE - } - else { - len + 1 // add one for a null byte - } - } - - override def putNormalizedKey( - record: T, - target: MemorySegment, - offset: Int, - numBytes: Int) - : Unit = { - if (numBytes > 0) { - // write a null byte with padding - if (record == null) { - target.putBoolean(offset, false) - // write padding - var j = 0 - while (j < numBytes - 1) { - target.put(offset + 1 + j, 0.toByte) - j += 1 - } - } - // write a non-null byte with key - else { - target.putBoolean(offset, true) - // write key - wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1) - } - } - } - - override def invertNormalizedKey(): Boolean = wrappedComparator.invertNormalizedKey() - - override def supportsSerializationWithKeyNormalization(): Boolean = false - - override def writeWithKeyNormalization(record: T, target: DataOutputView): Unit = - throw new UnsupportedOperationException("Record serialization with leading normalized keys" + - " not supported.") - - override def readWithKeyDenormalization(reuse: T, source: DataInputView): T = - throw new UnsupportedOperationException("Record deserialization with leading normalized keys" + - " not supported.") - - override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean = - wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1) - - override def setReference(toCompare: T): Unit = { - if (toCompare == null) { - nullReference = true - } - else { - nullReference = false - wrappedComparator.setReference(toCompare) - } - } - - override def compare(first: T, second: T): Int = { - // both values are null -> equality - if (first == null && second == null) { - 0 - } - // first value is null -> inequality - // but order is considered - else if (first == null) { - if (order) -1 else 1 - } - // second value is null -> inequality - // but order is considered - else if (second == null) { - if (order) 1 else -1 - } - // no null values - else { - wrappedComparator.compare(first, second) - } - } - - override def compareToReference(referencedComparator: TypeComparator[T]): Int = { - val otherComparator = referencedComparator.asInstanceOf[NullAwareComparator[T]] - val otherNullReference = otherComparator.nullReference - // both values are null -> equality - if (nullReference && otherNullReference) { - 0 - } - // first value is null -> inequality - // but order is considered - else if (nullReference) { - if (order) 1 else -1 - } - // second value is null -> inequality - // but order is considered - else if (otherNullReference) { - if (order) -1 else 1 - } - // no null values - else { - wrappedComparator.compareToReference(otherComparator.wrappedComparator) - } - } - - override def supportsNormalizedKey(): Boolean = wrappedComparator.supportsNormalizedKey() - - override def equalToReference(candidate: T): Boolean = { - // both values are null - if (candidate == null && nullReference) { - true - } - // one value is null - else if (candidate == null || nullReference) { - false - } - // no null value - else { - wrappedComparator.equalToReference(candidate) - } - } - - override def duplicate(): TypeComparator[T] = { - new NullAwareComparator[T](wrappedComparator.duplicate(), order) - } - - override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = { - if (record == null) { - var i = 0 - while (i < flatFields) { - target(index + i) = null - i += 1 - } - flatFields - } - else { - wrappedComparator.extractKeys(record, target, index) - } - } - - - override def getFlatComparators: Array[TypeComparator[_]] = { - // determine the flat comparators and wrap them again in null-aware comparators - val flatComparators = new ArrayBuffer[TypeComparator[_]]() - wrappedComparator match { - case ctc: CompositeTypeComparator[_] => ctc.getFlatComparator(flatComparators) - case c: TypeComparator[_] => flatComparators += c - } - val wrappedComparators = flatComparators.map { c => - new NullAwareComparator[Any](c.asInstanceOf[TypeComparator[Any]], order) - } - wrappedComparators.toArray[TypeComparator[_]] - } - - /** - * This method is not implemented here. It must be implemented by the comparator this class - * is contained in (e.g. RowComparator). - * - * @param firstSource The input view containing the first record. - * @param secondSource The input view containing the second record. - * @return An integer defining the oder among the objects in the same way as - * { @link java.util.Comparator#compare(Object, Object)}. - */ - override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = - throw new UnsupportedOperationException("Comparator does not support null-aware serialized " + - "comparision.") -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala deleted file mode 100644 index dcdc775..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.table.Row -import org.apache.flink.core.memory.{DataInputView, DataOutputView} - -object NullMaskUtils { - - def writeNullMask(len: Int, value: Row, target: DataOutputView): Unit = { - var b = 0x00 - var bytePos = 0 - - var fieldPos = 0 - var numPos = 0 - while (fieldPos < len) { - b = 0x00 - // set bits in byte - bytePos = 0 - numPos = Math.min(8, len - fieldPos) - while (bytePos < numPos) { - b = b << 1 - // set bit if field is null - if(value.productElement(fieldPos + bytePos) == null) { - b |= 0x01 - } - bytePos += 1 - } - fieldPos += numPos - // shift bits if last byte is not completely filled - b <<= (8 - bytePos) - // write byte - target.writeByte(b) - } - } - - def readIntoNullMask(len: Int, source: DataInputView, nullMask: Array[Boolean]): Unit = { - var b = 0x00 - var bytePos = 0 - - var fieldPos = 0 - var numPos = 0 - while (fieldPos < len) { - // read byte - b = source.readUnsignedByte() - bytePos = 0 - numPos = Math.min(8, len - fieldPos) - while (bytePos < numPos) { - nullMask(fieldPos + bytePos) = (b & 0x80) > 0 - b = b << 1 - bytePos += 1 - } - fieldPos += numPos - } - } - - def readIntoAndCopyNullMask( - len: Int, - source: DataInputView, - target: DataOutputView, - nullMask: Array[Boolean]): Unit = { - var b = 0x00 - var bytePos = 0 - - var fieldPos = 0 - var numPos = 0 - while (fieldPos < len) { - // read byte - b = source.readUnsignedByte() - // copy byte - target.writeByte(b) - bytePos = 0 - numPos = Math.min(8, len - fieldPos) - while (bytePos < numPos) { - nullMask(fieldPos + bytePos) = (b & 0x80) > 0 - b = b << 1 - bytePos += 1 - } - fieldPos += numPos - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala deleted file mode 100644 index 8bbe4d8..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala +++ /dev/null @@ -1,425 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import java.util - -import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator, TypeSerializer} -import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.NullMaskUtils.readIntoNullMask -import org.apache.flink.api.table.typeutils.RowComparator.{createAuxiliaryFields, makeNullAware} -import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment} -import org.apache.flink.types.KeyFieldOutOfBoundsException - -/** - * Comparator for [[Row]]. - */ -class RowComparator private ( - /** the number of fields of the Row */ - val numberOfFields: Int, - /** key positions describe which fields are keys in what order */ - val keyPositions: Array[Int], - /** null-aware comparators for the key fields, in the same order as the key fields */ - val comparators: Array[NullAwareComparator[Any]], - /** serializers to deserialize the first n fields for comparison */ - val serializers: Array[TypeSerializer[Any]], - /** auxiliary fields for normalized key support */ - private val auxiliaryFields: (Array[Int], Int, Int, Boolean)) - extends CompositeTypeComparator[Row] with Serializable { - - // null masks for serialized comparison - private val nullMask1 = new Array[Boolean](numberOfFields) - private val nullMask2 = new Array[Boolean](numberOfFields) - - // cache for the deserialized key field objects - @transient - private lazy val deserializedKeyFields1: Array[Any] = instantiateDeserializationFields() - - @transient - private lazy val deserializedKeyFields2: Array[Any] = instantiateDeserializationFields() - - // create auxiliary fields - private val normalizedKeyLengths: Array[Int] = auxiliaryFields._1 - private val numLeadingNormalizableKeys: Int = auxiliaryFields._2 - private val normalizableKeyPrefixLen: Int = auxiliaryFields._3 - private val invertNormKey: Boolean = auxiliaryFields._4 - - /** - * Intermediate constructor for creating auxiliary fields. - */ - def this( - numberOfFields: Int, - keyPositions: Array[Int], - comparators: Array[NullAwareComparator[Any]], - serializers: Array[TypeSerializer[Any]]) = { - this( - numberOfFields, - keyPositions, - comparators, - serializers, - createAuxiliaryFields(keyPositions, comparators)) - } - - /** - * General constructor for RowComparator. - * - * @param numberOfFields the number of fields of the Row - * @param keyPositions key positions describe which fields are keys in what order - * @param comparators non-null-aware comparators for the key fields, in the same order as - * the key fields - * @param serializers serializers to deserialize the first n fields for comparison - * @param orders sorting orders for the fields - */ - def this( - numberOfFields: Int, - keyPositions: Array[Int], - comparators: Array[TypeComparator[Any]], - serializers: Array[TypeSerializer[Any]], - orders: Array[Boolean]) = { - this( - numberOfFields, - keyPositions, - makeNullAware(comparators, orders), - serializers) - } - - private def instantiateDeserializationFields(): Array[Any] = { - val newFields = new Array[Any](serializers.length) - var i = 0 - while (i < serializers.length) { - newFields(i) = serializers(i).createInstance() - i += 1 - } - newFields - } - - // -------------------------------------------------------------------------------------------- - // Comparator Methods - // -------------------------------------------------------------------------------------------- - - override def compareToReference(referencedComparator: TypeComparator[Row]): Int = { - val other: RowComparator = referencedComparator.asInstanceOf[RowComparator] - var i = 0 - try { - while (i < keyPositions.length) { - val comparator = comparators(i) - val otherComparator = other.comparators(i) - - val cmp = comparator.compareToReference(otherComparator) - if (cmp != 0) { - return cmp - } - i = i + 1 - } - 0 - } - catch { - case iobex: IndexOutOfBoundsException => - throw new KeyFieldOutOfBoundsException(keyPositions(i)) - } - } - - override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = { - val len = serializers.length - val keyLen = keyPositions.length - - readIntoNullMask(numberOfFields, firstSource, nullMask1) - readIntoNullMask(numberOfFields, secondSource, nullMask2) - - // deserialize - var i = 0 - while (i < len) { - val serializer = serializers(i) - - // deserialize field 1 - if (!nullMask1(i)) { - deserializedKeyFields1(i) = serializer.deserialize(deserializedKeyFields1(i), firstSource) - } - - // deserialize field 2 - if (!nullMask2(i)) { - deserializedKeyFields2(i) = serializer.deserialize(deserializedKeyFields2(i), secondSource) - } - - i += 1 - } - - // compare - i = 0 - while (i < keyLen) { - val keyPos = keyPositions(i) - val comparator = comparators(i) - - val isNull1 = nullMask1(keyPos) - val isNull2 = nullMask2(keyPos) - - var cmp = 0 - // both values are null -> equality - if (isNull1 && isNull2) { - cmp = 0 - } - // first value is null -> inequality - else if (isNull1) { - cmp = comparator.compare(null, deserializedKeyFields2(keyPos)) - } - // second value is null -> inequality - else if (isNull2) { - cmp = comparator.compare(deserializedKeyFields1(keyPos), null) - } - // no null values - else { - cmp = comparator.compare(deserializedKeyFields1(keyPos), deserializedKeyFields2(keyPos)) - } - - if (cmp != 0) { - return cmp - } - - i += 1 - } - 0 - } - - override def supportsNormalizedKey(): Boolean = numLeadingNormalizableKeys > 0 - - override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen - - override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean = - numLeadingNormalizableKeys < keyPositions.length || - normalizableKeyPrefixLen == Integer.MAX_VALUE || - normalizableKeyPrefixLen > keyBytes - - override def invertNormalizedKey(): Boolean = invertNormKey - - override def supportsSerializationWithKeyNormalization(): Boolean = false - - override def writeWithKeyNormalization(record: Row, target: DataOutputView): Unit = - throw new UnsupportedOperationException("Record serialization with leading normalized keys " + - "not supported.") - - override def readWithKeyDenormalization(reuse: Row, source: DataInputView): Row = - throw new UnsupportedOperationException("Record deserialization with leading normalized keys " + - "not supported.") - - override def duplicate(): TypeComparator[Row] = { - // copy comparator and serializer factories - val comparatorsCopy = comparators.map(_.duplicate().asInstanceOf[NullAwareComparator[Any]]) - val serializersCopy = serializers.map(_.duplicate()) - - new RowComparator( - numberOfFields, - keyPositions, - comparatorsCopy, - serializersCopy, - auxiliaryFields) - } - - override def hash(value: Row): Int = { - var code: Int = 0 - var i = 0 - try { - while(i < keyPositions.length) { - code *= TupleComparatorBase.HASH_SALT(i & 0x1F) - val element = value.productElement(keyPositions(i)) // element can be null - code += comparators(i).hash(element) - i += 1 - } - } catch { - case iobex: IndexOutOfBoundsException => - throw new KeyFieldOutOfBoundsException(keyPositions(i)) - } - code - } - - override def setReference(toCompare: Row) { - var i = 0 - try { - while(i < keyPositions.length) { - val comparator = comparators(i) - val element = toCompare.productElement(keyPositions(i)) - comparator.setReference(element) // element can be null - i += 1 - } - } catch { - case iobex: IndexOutOfBoundsException => - throw new KeyFieldOutOfBoundsException(keyPositions(i)) - } - } - - override def equalToReference(candidate: Row): Boolean = { - var i = 0 - try { - while(i < keyPositions.length) { - val comparator = comparators(i) - val element = candidate.productElement(keyPositions(i)) // element can be null - // check if reference is not equal - if (!comparator.equalToReference(element)) { - return false - } - i += 1 - } - } catch { - case iobex: IndexOutOfBoundsException => - throw new KeyFieldOutOfBoundsException(keyPositions(i)) - } - true - } - - override def compare(first: Row, second: Row): Int = { - var i = 0 - try { - while(i < keyPositions.length) { - val keyPos: Int = keyPositions(i) - val comparator = comparators(i) - val firstElement = first.productElement(keyPos) // element can be null - val secondElement = second.productElement(keyPos) // element can be null - - val cmp = comparator.compare(firstElement, secondElement) - if (cmp != 0) { - return cmp - } - i += 1 - } - } catch { - case iobex: IndexOutOfBoundsException => - throw new KeyFieldOutOfBoundsException(keyPositions(i)) - } - 0 - } - - override def putNormalizedKey( - record: Row, - target: MemorySegment, - offset: Int, - numBytes: Int) - : Unit = { - var bytesLeft = numBytes - var currentOffset = offset - - var i = 0 - while (i < numLeadingNormalizableKeys && bytesLeft > 0) { - var len = normalizedKeyLengths(i) - len = if (bytesLeft >= len) len else bytesLeft - - val comparator = comparators(i) - val element = record.productElement(keyPositions(i)) // element can be null - // write key - comparator.putNormalizedKey(element, target, currentOffset, len) - - bytesLeft -= len - currentOffset += len - i += 1 - } - } - - override def getFlatComparator(flatComparators: util.List[TypeComparator[_]]): Unit = - comparators.foreach { c => - c.getFlatComparators.foreach { fc => - flatComparators.add(fc) - } - } - - override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = { - val len = comparators.length - var localIndex = index - var i = 0 - while (i < len) { - val element = record.asInstanceOf[Row].productElement(keyPositions(i)) // element can be null - localIndex += comparators(i).extractKeys(element, target, localIndex) - i += 1 - } - localIndex - index - } -} - -object RowComparator { - private def makeNullAware( - comparators: Array[TypeComparator[Any]], - orders: Array[Boolean]) - : Array[NullAwareComparator[Any]] = - comparators - .zip(orders) - .map { case (comp, order) => - new NullAwareComparator[Any]( - comp, - order) - } - - /** - * @return creates auxiliary fields for normalized key support - */ - private def createAuxiliaryFields( - keyPositions: Array[Int], - comparators: Array[NullAwareComparator[Any]]) - : (Array[Int], Int, Int, Boolean) = { - - val normalizedKeyLengths = new Array[Int](keyPositions.length) - var numLeadingNormalizableKeys = 0 - var normalizableKeyPrefixLen = 0 - var inverted = false - - var i = 0 - while (i < keyPositions.length) { - val k = comparators(i) - // as long as the leading keys support normalized keys, we can build up the composite key - if (k.supportsNormalizedKey()) { - if (i == 0) { - // the first comparator decides whether we need to invert the key direction - inverted = k.invertNormalizedKey() - } - else if (k.invertNormalizedKey() != inverted) { - // if a successor does not agree on the inversion direction, it cannot be part of the - // normalized key - return (normalizedKeyLengths, - numLeadingNormalizableKeys, - normalizableKeyPrefixLen, - inverted) - } - numLeadingNormalizableKeys += 1 - val len = k.getNormalizeKeyLen - if (len < 0) { - throw new RuntimeException("Comparator " + k.getClass.getName + - " specifies an invalid length for the normalized key: " + len) - } - normalizedKeyLengths(i) = len - normalizableKeyPrefixLen += len - if (normalizableKeyPrefixLen < 0) { - // overflow, which means we are out of budget for normalized key space anyways - return (normalizedKeyLengths, - numLeadingNormalizableKeys, - Integer.MAX_VALUE, - inverted) - } - } - else { - return (normalizedKeyLengths, - numLeadingNormalizableKeys, - normalizableKeyPrefixLen, - inverted) - } - i += 1 - } - (normalizedKeyLengths, - numLeadingNormalizableKeys, - normalizableKeyPrefixLen, - inverted) - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala deleted file mode 100644 index 825a99c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.NullMaskUtils.{writeNullMask, readIntoNullMask, readIntoAndCopyNullMask} -import org.apache.flink.core.memory.{DataInputView, DataOutputView} - -/** - * Serializer for [[Row]]. - */ -class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) - extends TypeSerializer[Row] { - - private val nullMask = new Array[Boolean](fieldSerializers.length) - - override def isImmutableType: Boolean = false - - override def getLength: Int = -1 - - override def duplicate: RowSerializer = { - val duplicateFieldSerializers = fieldSerializers.map(_.duplicate()) - new RowSerializer(duplicateFieldSerializers) - } - - override def createInstance: Row = { - new Row(fieldSerializers.length) - } - - override def copy(from: Row, reuse: Row): Row = { - val len = fieldSerializers.length - - // cannot reuse, do a non-reuse copy - if (reuse == null) { - return copy(from) - } - - if (from.productArity != len || reuse.productArity != len) { - throw new RuntimeException("Row arity of reuse or from is incompatible with this " + - "RowSerializer.") - } - - var i = 0 - while (i < len) { - val fromField = from.productElement(i) - if (fromField != null) { - val reuseField = reuse.productElement(i) - if (reuseField != null) { - val copy = fieldSerializers(i).copy(fromField, reuseField) - reuse.setField(i, copy) - } - else { - val copy = fieldSerializers(i).copy(fromField) - reuse.setField(i, copy) - } - } - else { - reuse.setField(i, null) - } - i += 1 - } - reuse - } - - override def copy(from: Row): Row = { - val len = fieldSerializers.length - - if (from.productArity != len) { - throw new RuntimeException("Row arity of from does not match serializers.") - } - val result = new Row(len) - var i = 0 - while (i < len) { - val fromField = from.productElement(i).asInstanceOf[AnyRef] - if (fromField != null) { - val copy = fieldSerializers(i).copy(fromField) - result.setField(i, copy) - } - else { - result.setField(i, null) - } - i += 1 - } - result - } - - override def serialize(value: Row, target: DataOutputView) { - val len = fieldSerializers.length - - if (value.productArity != len) { - throw new RuntimeException("Row arity of value does not match serializers.") - } - - // write a null mask - writeNullMask(len, value, target) - - // serialize non-null fields - var i = 0 - while (i < len) { - val o = value.productElement(i).asInstanceOf[AnyRef] - if (o != null) { - val serializer = fieldSerializers(i) - serializer.serialize(value.productElement(i), target) - } - i += 1 - } - } - - override def deserialize(reuse: Row, source: DataInputView): Row = { - val len = fieldSerializers.length - - if (reuse.productArity != len) { - throw new RuntimeException("Row arity of reuse does not match serializers.") - } - - // read null mask - readIntoNullMask(len, source, nullMask) - - // read non-null fields - var i = 0 - while (i < len) { - if (nullMask(i)) { - reuse.setField(i, null) - } - else { - val reuseField = reuse.productElement(i).asInstanceOf[AnyRef] - if (reuseField != null) { - reuse.setField(i, fieldSerializers(i).deserialize(reuseField, source)) - } - else { - reuse.setField(i, fieldSerializers(i).deserialize(source)) - } - } - i += 1 - } - reuse - } - - override def deserialize(source: DataInputView): Row = { - val len = fieldSerializers.length - - val result = new Row(len) - - // read null mask - readIntoNullMask(len, source, nullMask) - - // read non-null fields - var i = 0 - while (i < len) { - if (nullMask(i)) { - result.setField(i, null) - } - else { - result.setField(i, fieldSerializers(i).deserialize(source)) - } - i += 1 - } - result - } - - override def copy(source: DataInputView, target: DataOutputView): Unit = { - val len = fieldSerializers.length - - // copy null mask - readIntoAndCopyNullMask(len, source, target, nullMask) - - // read non-null fields - var i = 0 - while (i < len) { - if (!nullMask(i)) { - fieldSerializers(i).copy(source, target) - } - i += 1 - } - } - - override def equals(any: Any): Boolean = { - any match { - case otherRS: RowSerializer => - otherRS.canEqual(this) && - fieldSerializers.sameElements(otherRS.fieldSerializers) - case _ => false - } - } - - override def canEqual(obj: AnyRef): Boolean = { - obj.isInstanceOf[RowSerializer] - } - - override def hashCode(): Int = { - java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]]) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala deleted file mode 100644 index 711bb49..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.TypeComparatorBuilder -import org.apache.flink.api.common.typeutils.TypeComparator -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo - -import scala.collection.mutable.ArrayBuffer -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.table.Row - -/** - * TypeInformation for [[Row]]. - */ -class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]]) - extends CaseClassTypeInfo[Row]( - classOf[Row], - Array(), - fieldTypes, - for (i <- fieldTypes.indices) yield "f" + i) -{ - - def this(fieldTypes: Array[TypeInformation[_]]) = { - this(fieldTypes.toSeq) - } - - /** - * Temporary variable for directly passing orders to comparators. - */ - var comparatorOrders: Option[Array[Boolean]] = None - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { - val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = this.types(i).createSerializer(executionConfig) - .asInstanceOf[TypeSerializer[Any]] - } - - new RowSerializer(fieldSerializers) - } - - override def createComparator( - logicalKeyFields: Array[Int], - orders: Array[Boolean], - logicalFieldOffset: Int, - config: ExecutionConfig) - : TypeComparator[Row] = { - // store the order information for the builder - comparatorOrders = Some(orders) - val comparator = super.createComparator(logicalKeyFields, orders, logicalFieldOffset, config) - comparatorOrders = None - comparator - } - - override def createTypeComparatorBuilder(): TypeComparatorBuilder[Row] = { - new RowTypeComparatorBuilder(comparatorOrders.getOrElse( - throw new IllegalStateException("Cannot create comparator builder without orders."))) - } - - private class RowTypeComparatorBuilder( - comparatorOrders: Array[Boolean]) - extends TypeComparatorBuilder[Row] { - - val fieldComparators: ArrayBuffer[TypeComparator[_]] = new ArrayBuffer[TypeComparator[_]]() - val logicalKeyFields: ArrayBuffer[Int] = new ArrayBuffer[Int]() - - override def initializeTypeComparatorBuilder(size: Int): Unit = { - fieldComparators.sizeHint(size) - logicalKeyFields.sizeHint(size) - } - - override def addComparatorField(fieldId: Int, comparator: TypeComparator[_]): Unit = { - fieldComparators += comparator - logicalKeyFields += fieldId - } - - override def createTypeComparator(config: ExecutionConfig): TypeComparator[Row] = { - val maxIndex = logicalKeyFields.max - - new RowComparator( - getArity, - logicalKeyFields.toArray, - fieldComparators.toArray.asInstanceOf[Array[TypeComparator[Any]]], - types.take(maxIndex + 1).map(_.createSerializer(config).asInstanceOf[TypeSerializer[Any]]), - comparatorOrders - ) - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala deleted file mode 100644 index d72e7a8..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ /dev/null @@ -1,882 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime.io - -import java.io.{File, FileOutputStream, OutputStreamWriter} -import java.nio.charset.StandardCharsets -import java.sql.{Date, Time, Timestamp} - -import org.apache.flink.api.common.io.ParseException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR} -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.configuration.Configuration -import org.apache.flink.core.fs.{FileInputSplit, Path} -import org.apache.flink.types.parser.{FieldParser, StringParser} -import org.junit.Assert._ -import org.junit.{Ignore, Test} - -class RowCsvInputFormatTest { - - @Test - def ignoreInvalidLines() { - val fileContent = - "#description of the data\n" + - "header1|header2|header3|\n" + - "this is|1|2.0|\n" + - "//a comment\n" + - "a test|3|4.0|\n" + - "#next|5|6.0|\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - format.setLenient(false) - val parameters = new Configuration - format.configure(parameters) - format.open(split) - - var result = new Row(3) - try { - result = format.nextRecord(result) - fail("Parse Exception was not thrown! (Row too short)") - } - catch { - case ex: ParseException => // ok - } - - try { - result = format.nextRecord(result) - fail("Parse Exception was not thrown! (Invalid int value)") - } - catch { - case ex: ParseException => // ok - } - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("this is", result.productElement(0)) - assertEquals(1, result.productElement(1)) - assertEquals(2.0, result.productElement(2)) - - try { - result = format.nextRecord(result) - fail("Parse Exception was not thrown! (Row too short)") - } - catch { - case ex: ParseException => // ok - } - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("a test", result.productElement(0)) - assertEquals(3, result.productElement(1)) - assertEquals(4.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("#next", result.productElement(0)) - assertEquals(5, result.productElement(1)) - assertEquals(6.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - - // re-open with lenient = true - format.setLenient(true) - format.configure(parameters) - format.open(split) - - result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("header1", result.productElement(0)) - assertNull(result.productElement(1)) - assertNull(result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("this is", result.productElement(0)) - assertEquals(1, result.productElement(1)) - assertEquals(2.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("a test", result.productElement(0)) - assertEquals(3, result.productElement(1)) - assertEquals(4.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("#next", result.productElement(0)) - assertEquals(5, result.productElement(1)) - assertEquals(6.0, result.productElement(2)) - result = format.nextRecord(result) - assertNull(result) - } - - @Test - def ignoreSingleCharPrefixComments() { - val fileContent = - "#description of the data\n" + - "#successive commented line\n" + - "this is|1|2.0|\n" + - "a test|3|4.0|\n" + - "#next|5|6.0|\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - format.setCommentPrefix("#") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("this is", result.productElement(0)) - assertEquals(1, result.productElement(1)) - assertEquals(2.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("a test", result.productElement(0)) - assertEquals(3, result.productElement(1)) - assertEquals(4.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - } - - @Test - def ignoreMultiCharPrefixComments() { - val fileContent = - "//description of the data\n" + - "//successive commented line\n" + - "this is|1|2.0|\n" + - "a test|3|4.0|\n" + - "//next|5|6.0|\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - format.setCommentPrefix("//") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("this is", result.productElement(0)) - assertEquals(1, result.productElement(1)) - assertEquals(2.0, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("a test", result.productElement(0)) - assertEquals(3, result.productElement(1)) - assertEquals(4.0, result.productElement(2)) - result = format.nextRecord(result) - assertNull(result) - } - - @Test - def readStringFields() { - val fileContent = "abc|def|ghijk\nabc||hhg\n|||" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("abc", result.productElement(0)) - assertEquals("def", result.productElement(1)) - assertEquals("ghijk", result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("abc", result.productElement(0)) - assertEquals("", result.productElement(1)) - assertEquals("hhg", result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("", result.productElement(0)) - assertEquals("", result.productElement(1)) - assertEquals("", result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test def readMixedQuotedStringFields() { - val fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - format.configure(new Configuration) - format.enableQuotedStringParsing('@') - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("a|b|c", result.productElement(0)) - assertEquals("def", result.productElement(1)) - assertEquals("ghijk", result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("abc", result.productElement(0)) - assertEquals("", result.productElement(1)) - assertEquals("|hhg", result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("", result.productElement(0)) - assertEquals("", result.productElement(1)) - assertEquals("", result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test def readStringFieldsWithTrailingDelimiters() { - val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - format.setFieldDelimiter("|-") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("abc", result.productElement(0)) - assertEquals("def", result.productElement(1)) - assertEquals("ghijk", result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("abc", result.productElement(0)) - assertEquals("", result.productElement(1)) - assertEquals("hhg", result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals("", result.productElement(0)) - assertEquals("", result.productElement(1)) - assertEquals("", result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testIntegerFields() { - val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|") - - format.setFieldDelimiter("|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(5) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(111, result.productElement(0)) - assertEquals(222, result.productElement(1)) - assertEquals(333, result.productElement(2)) - assertEquals(444, result.productElement(3)) - assertEquals(555, result.productElement(4)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(666, result.productElement(0)) - assertEquals(777, result.productElement(1)) - assertEquals(888, result.productElement(2)) - assertEquals(999, result.productElement(3)) - assertEquals(0, result.productElement(4)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testEmptyFields() { - val fileContent = - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.BYTE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.FLOAT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyColumnAsNull = true) - format.setFieldDelimiter(",") - format.configure(new Configuration) - format.open(split) - - var result = new Row(8) - val linesCnt = fileContent.split("\n").length - - for (i <- 0 until linesCnt) yield { - result = format.nextRecord(result) - assertNull(result.productElement(i)) - } - - // ensure no more rows - assertNull(format.nextRecord(result)) - assertTrue(format.reachedEnd) - } - - @Test - def testDoubleFields() { - val fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) - format.setFieldDelimiter("|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(5) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(11.1, result.productElement(0)) - assertEquals(22.2, result.productElement(1)) - assertEquals(33.3, result.productElement(2)) - assertEquals(44.4, result.productElement(3)) - assertEquals(55.5, result.productElement(4)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(66.6, result.productElement(0)) - assertEquals(77.7, result.productElement(1)) - assertEquals(88.8, result.productElement(2)) - assertEquals(99.9, result.productElement(3)) - assertEquals(0.0, result.productElement(4)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testReadFirstN() { - val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) - format.setFieldDelimiter("|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(2) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(111, result.productElement(0)) - assertEquals(222, result.productElement(1)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(666, result.productElement(0)) - assertEquals(777, result.productElement(1)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testReadSparseWithNullFieldsForTypes() { - val fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" + - "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|" - - val split = createTempFile(fileContent) - - val typeInfo: RowTypeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO)) - - val format = new RowCsvInputFormat( - PATH, - rowTypeInfo = typeInfo, - includedFieldsMask = Array(true, false, false, true, false, false, false, true)) - format.setFieldDelimiter("|x|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(111, result.productElement(0)) - assertEquals(444, result.productElement(1)) - assertEquals(888, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(0, result.productElement(0)) - assertEquals(777, result.productElement(1)) - assertEquals(333, result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testReadSparseWithPositionSetter() { - val fileContent = "111|222|333|444|555|666|777|888|999|000|\n" + - "000|999|888|777|666|555|444|333|222|111|" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO)) - - val format = new RowCsvInputFormat( - PATH, - typeInfo, - Array(0, 3, 7)) - format.setFieldDelimiter("|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - result = format.nextRecord(result) - - assertNotNull(result) - assertEquals(111, result.productElement(0)) - assertEquals(444, result.productElement(1)) - assertEquals(888, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(0, result.productElement(0)) - assertEquals(777, result.productElement(1)) - assertEquals(333, result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testReadSparseWithMask() { - val fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" + - "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&" - - val split = RowCsvInputFormatTest.createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO)) - - val format = new RowCsvInputFormat( - PATH, - rowTypeInfo = typeInfo, - includedFieldsMask = Array(true, false, false, true, false, false, false, true)) - format.setFieldDelimiter("&&") - format.configure(new Configuration) - format.open(split) - - var result = new Row(3) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(111, result.productElement(0)) - assertEquals(444, result.productElement(1)) - assertEquals(888, result.productElement(2)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(0, result.productElement(0)) - assertEquals(777, result.productElement(1)) - assertEquals(333, result.productElement(2)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } - - @Test - def testParseStringErrors() { - val stringParser = new StringParser - stringParser.enableQuotedStringParsing('"'.toByte) - - val failures = Seq( - ("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING), - ("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING) - ) - - for (failure <- failures) { - val result = stringParser.parseField( - failure._1.getBytes, - 0, - failure._1.length, - Array[Byte]('|'), - null) - - assertEquals(-1, result) - assertEquals(failure._2, stringParser.getErrorState) - } - } - - // Test disabled because we do not support double-quote escaped quotes right now. - @Test - @Ignore - def testParserCorrectness() { - // RFC 4180 Compliance Test content - // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example - val fileContent = "Year,Make,Model,Description,Price\n" + - "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" + - "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" + - "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" + - "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" + - ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO)) - - val format = new RowCsvInputFormat(PATH, typeInfo) - format.setSkipFirstLineAsHeader(true) - format.setFieldDelimiter(",") - format.configure(new Configuration) - format.open(split) - - var result = new Row(5) - val r1: Row = new Row(5) - r1.setField(0, 1997) - r1.setField(1, "Ford") - r1.setField(2, "E350") - r1.setField(3, "ac, abs, moon") - r1.setField(4, 3000.0) - - val r2: Row = new Row(5) - r2.setField(0, 1999) - r2.setField(1, "Chevy") - r2.setField(2, "Venture \"Extended Edition\"") - r2.setField(3, "") - r2.setField(4, 4900.0) - - val r3: Row = new Row(5) - r3.setField(0, 1996) - r3.setField(1, "Jeep") - r3.setField(2, "Grand Cherokee") - r3.setField(3, "MUST SELL! air, moon roof, loaded") - r3.setField(4, 4799.0) - - val r4: Row = new Row(5) - r4.setField(0, 1999) - r4.setField(1, "Chevy") - r4.setField(2, "Venture \"Extended Edition, Very Large\"") - r4.setField(3, "") - r4.setField(4, 5000.0) - - val r5: Row = new Row(5) - r5.setField(0, 0) - r5.setField(1, "") - r5.setField(2, "Venture \"Extended Edition\"") - r5.setField(3, "") - r5.setField(4, 4900.0) - - val expectedLines = Array(r1, r2, r3, r4, r5) - for (expected <- expectedLines) { - result = format.nextRecord(result) - assertEquals(expected, result) - } - assertNull(format.nextRecord(result)) - assertTrue(format.reachedEnd) - } - - @Test - def testWindowsLineEndRemoval() { - - // check typical use case -- linux file is correct and it is set up to linux(\n) - testRemovingTrailingCR("\n", "\n") - - // check typical windows case -- windows file endings and file has windows file endings set up - testRemovingTrailingCR("\r\n", "\r\n") - - // check problematic case windows file -- windows file endings(\r\n) - // but linux line endings (\n) set up - testRemovingTrailingCR("\r\n", "\n") - - // check problematic case linux file -- linux file endings (\n) - // but windows file endings set up (\r\n) - // specific setup for windows line endings will expect \r\n because - // it has to be set up and is not standard. - } - - @Test - def testQuotedStringParsingWithIncludeFields() { - val fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" + - "\"Blahblah <[email protected]>\"|\"blaaa|\"blubb\"" - val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp") - tempFile.deleteOnExit() - tempFile.setWritable(true) - - val writer = new OutputStreamWriter(new FileOutputStream(tempFile)) - writer.write(fileContent) - writer.close() - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val inputFormat = new RowCsvInputFormat( - new Path(tempFile.toURI.toString), - rowTypeInfo = typeInfo, - includedFieldsMask = Array(true, false, true)) - inputFormat.enableQuotedStringParsing('"') - inputFormat.setFieldDelimiter("|") - inputFormat.setDelimiter('\n') - inputFormat.configure(new Configuration) - - val splits = inputFormat.createInputSplits(1) - inputFormat.open(splits(0)) - - val record = inputFormat.nextRecord(new Row(2)) - assertEquals("20:41:52-1-3-2015", record.productElement(0)) - assertEquals("Blahblah <[email protected]>", record.productElement(1)) - } - - @Test - def testQuotedStringParsingWithEscapedQuotes() { - val fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\"" - val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp") - tempFile.deleteOnExit() - tempFile.setWritable(true) - - val writer = new OutputStreamWriter(new FileOutputStream(tempFile)) - writer.write(fileContent) - writer.close() - - val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val inputFormat = new RowCsvInputFormat( - new Path(tempFile.toURI.toString), - rowTypeInfo = typeInfo) - inputFormat.enableQuotedStringParsing('"') - inputFormat.setFieldDelimiter("|") - inputFormat.setDelimiter('\n') - inputFormat.configure(new Configuration) - - val splits = inputFormat.createInputSplits(1) - inputFormat.open(splits(0)) - - val record = inputFormat.nextRecord(new Row(2)) - assertEquals("\\\"Hello\\\" World", record.productElement(0)) - assertEquals("We are\\\" young", record.productElement(1)) - } - - @Test - def testSqlTimeFields() { - val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" + - "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n" - - val split = createTempFile(fileContent) - - val typeInfo = new RowTypeInfo(Seq( - SqlTimeTypeInfo.DATE, - SqlTimeTypeInfo.TIME, - SqlTimeTypeInfo.TIMESTAMP, - SqlTimeTypeInfo.TIMESTAMP)) - - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) - format.setFieldDelimiter("|") - format.configure(new Configuration) - format.open(split) - - var result = new Row(4) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(Date.valueOf("1990-10-14"), result.productElement(0)) - assertEquals(Time.valueOf("02:42:25"), result.productElement(1)) - assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2)) - assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.productElement(3)) - - result = format.nextRecord(result) - assertNotNull(result) - assertEquals(Date.valueOf("1990-10-14"), result.productElement(0)) - assertEquals(Time.valueOf("02:42:25"), result.productElement(1)) - assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2)) - assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.productElement(3)) - - result = format.nextRecord(result) - assertNull(result) - assertTrue(format.reachedEnd) - } -} - -object RowCsvInputFormatTest { - - private val PATH = new Path("an/ignored/file/") - - // static variables for testing the removal of \r\n to \n - private val FIRST_PART = "That is the first part" - private val SECOND_PART = "That is the second part" - - private def createTempFile(content: String): FileInputSplit = { - val tempFile = File.createTempFile("test_contents", "tmp") - tempFile.deleteOnExit() - val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8) - wrt.write(content) - wrt.close() - new FileInputSplit( - 0, - new Path(tempFile.toURI.toString), - 0, - tempFile.length, - Array("localhost")) - } - - private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) { - val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile - - // create input file - val tempFile = File.createTempFile("CsvInputFormatTest", "tmp") - tempFile.deleteOnExit() - tempFile.setWritable(true) - - val wrt = new OutputStreamWriter(new FileOutputStream(tempFile)) - wrt.write(fileContent) - wrt.close() - - val typeInfo = new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO)) - - val inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI.toString), typeInfo) - inputFormat.configure(new Configuration) - inputFormat.setDelimiter(lineBreakerSetup) - - val splits = inputFormat.createInputSplits(1) - inputFormat.open(splits(0)) - - var result = inputFormat.nextRecord(new Row(1)) - assertNotNull("Expecting to not return null", result) - assertEquals(FIRST_PART, result.productElement(0)) - - result = inputFormat.nextRecord(result) - assertNotNull("Expecting to not return null", result) - assertEquals(SECOND_PART, result.productElement(0)) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala deleted file mode 100644 index 557db3a..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} -import org.apache.flink.api.java.tuple -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo -import org.junit.Assert._ - -class RowComparatorTest extends ComparatorTestBase[Row] { - - val typeInfo = new RowTypeInfo( - Array( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo]))) - - val testPojo1 = new MyPojo() - // TODO we cannot test null here as PojoComparator has no support for null keys - testPojo1.name = "" - val testPojo2 = new MyPojo() - testPojo2.name = "Test1" - val testPojo3 = new MyPojo() - testPojo3.name = "Test2" - - val data: Array[Row] = Array( - createRow(null, null, null, null, null), - createRow(0, null, null, null, null), - createRow(0, 0.0, null, null, null), - createRow(0, 0.0, "a", null, null), - createRow(1, 0.0, "a", null, null), - createRow(1, 1.0, "a", null, null), - createRow(1, 1.0, "b", null, null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) - ) - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - - override protected def createComparator(ascending: Boolean): TypeComparator[Row] = { - typeInfo.createComparator( - Array(0, 1, 2, 3, 4, 5, 6), - Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending), - 0, - new ExecutionConfig()) - } - - override protected def createSerializer(): TypeSerializer[Row] = { - typeInfo.createSerializer(new ExecutionConfig()) - } - - override protected def getSortedTestData: Array[Row] = { - data - } - - override protected def supportsNullKeys: Boolean = true - - def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { - val r: Row = new Row(5) - r.setField(0, f0) - r.setField(1, f1) - r.setField(2, f2) - r.setField(3, f3) - r.setField(4, f4) - r - } -} - -object RowComparatorTest { - - class MyPojo() extends Serializable with Comparable[MyPojo] { - // we cannot use null because the PojoComparator does not support null properly - var name: String = "" - - override def compareTo(o: MyPojo): Int = { - if (name == null && o.name == null) { - 0 - } - else if (name == null) { - -1 - } - else if (o.name == null) { - 1 - } - else { - name.compareTo(o.name) - } - } - - override def equals(other: Any): Boolean = other match { - case that: MyPojo => compareTo(that) == 0 - case _ => false - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala deleted file mode 100644 index 33715c1..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} -import org.apache.flink.api.table.Row -import org.apache.flink.util.Preconditions -import org.junit.Assert._ - -/** - * Tests [[RowComparator]] for wide rows. - */ -class RowComparatorWithManyFieldsTest extends ComparatorTestBase[Row] { - val numberOfFields = 10 - val fieldTypes = new Array[TypeInformation[_]](numberOfFields) - for (i <- 0 until numberOfFields) { - fieldTypes(i) = BasicTypeInfo.STRING_TYPE_INFO - } - val typeInfo = new RowTypeInfo(fieldTypes) - - val data: Array[Row] = Array( - createRow(Array(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0")), - createRow(Array("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1")), - createRow(Array("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2")), - createRow(Array("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3")) - ) - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - - override protected def createComparator(ascending: Boolean): TypeComparator[Row] = { - typeInfo.createComparator( - Array(0), - Array(ascending), - 0, - new ExecutionConfig()) - } - - override protected def createSerializer(): TypeSerializer[Row] = { - typeInfo.createSerializer(new ExecutionConfig()) - } - - override protected def getSortedTestData: Array[Row] = { - data - } - - override protected def supportsNullKeys: Boolean = true - - private def createRow(values: Array[_]): Row = { - Preconditions.checkArgument(values.length == numberOfFields) - val r: Row = new Row(numberOfFields) - values.zipWithIndex.foreach { case (e, i) => r.setField(i, e) } - r - } -}
