openinx commented on a change in pull request #1255:
URL: https://github.com/apache/iceberg/pull/1255#discussion_r473043330
##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -87,7 +87,7 @@ private GenericOrcWriters() {
return UUIDWriter.INSTANCE;
}
- public static OrcValueWriter<byte[]> fixed() {
+ public static OrcValueWriter<byte[]> bytes() {
Review comment:
Sounds good.
##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -326,7 +326,7 @@ public void nonNullWrite(int rowId, LocalDateTime data,
ColumnVector output) {
public void nonNullWrite(int rowId, BigDecimal data, ColumnVector output) {
// TODO: validate precision and scale from schema
((DecimalColumnVector) output).vector[rowId]
- .setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
+ .setFromLongAndScale(data.unscaledValue().longValueExact(),
data.scale());
Review comment:
Actually, we don't need to change this now, because this merged patch
has fixed it.
https://github.com/apache/iceberg/commit/6f96b36a39f26cfbc6f66dc762148577e5697534#diff-b1b07b15f036000a3f2bed76fdd9f961R334
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
+
+class FlinkOrcReaders {
+ private FlinkOrcReaders() {
+ }
+
+ static OrcValueReader<StringData> strings() {
+ return StringReader.INSTANCE;
+ }
+
+ static OrcValueReader<Integer> dates() {
+ return DateReader.INSTANCE;
+ }
+
+ static OrcValueReader<DecimalData> decimals(int precision, int scale) {
+ if (precision <= 18) {
+ return new Decimal18Reader(precision, scale);
+ } else if (precision <= 38) {
+ return new Decimal38Reader(precision, scale);
+ } else {
+ throw new IllegalArgumentException("Invalid precision: " + precision);
+ }
+ }
+
+ static OrcValueReader<Integer> times() {
+ return TimeReader.INSTANCE;
+ }
+
+ static OrcValueReader<TimestampData> timestamps() {
+ return TimestampReader.INSTANCE;
+ }
+
+ static OrcValueReader<TimestampData> timestampTzs() {
+ return TimestampTzReader.INSTANCE;
+ }
+
+ static <T> OrcValueReader<ArrayData> array(OrcValueReader<T> elementReader) {
+ return new ArrayReader<>(elementReader);
+ }
+
+ public static <K, V> OrcValueReader<MapData> map(OrcValueReader<K>
keyReader, OrcValueReader<V> valueReader) {
+ return new MapReader<>(keyReader, valueReader);
+ }
+
+ public static OrcValueReader<RowData> struct(List<OrcValueReader<?>> readers,
+ Types.StructType struct,
+ Map<Integer, ?> idToConstant) {
+ return new StructReader(readers, struct, idToConstant);
+ }
+
+ private static class StringReader implements OrcValueReader<StringData> {
+ private static final StringReader INSTANCE = new StringReader();
+
+ @Override
+ public StringData nonNullRead(ColumnVector vector, int row) {
+ BytesColumnVector bytesVector = (BytesColumnVector) vector;
+ return StringData.fromBytes(bytesVector.vector[row],
bytesVector.start[row], bytesVector.length[row]);
+ }
+ }
+
+ private static class DateReader implements OrcValueReader<Integer> {
+ private static final DateReader INSTANCE = new DateReader();
+
+ @Override
+ public Integer nonNullRead(ColumnVector vector, int row) {
+ return (int) ((LongColumnVector) vector).vector[row];
+ }
+ }
+
+ private static class Decimal18Reader implements OrcValueReader<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ Decimal18Reader(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public DecimalData nonNullRead(ColumnVector vector, int row) {
+ HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];
+ return DecimalData.fromUnscaledLong(value.serialize64(scale), precision,
scale);
+ }
+ }
+
+ private static class Decimal38Reader implements OrcValueReader<DecimalData> {
+ private final int precision;
+ private final int scale;
+
+ Decimal38Reader(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public DecimalData nonNullRead(ColumnVector vector, int row) {
+ BigDecimal value = ((DecimalColumnVector)
vector).vector[row].getHiveDecimal().bigDecimalValue();
+ return DecimalData.fromBigDecimal(value, precision, scale);
+ }
+ }
+
+ private static class TimeReader implements OrcValueReader<Integer> {
+ private static final TimeReader INSTANCE = new TimeReader();
+
+ @Override
+ public Integer nonNullRead(ColumnVector vector, int row) {
+ long micros = ((LongColumnVector) vector).vector[row];
+ // Flink only support time mills, just erase micros.
+ return (int) (micros / 1000);
+ }
+ }
+
+ private static class TimestampReader implements
OrcValueReader<TimestampData> {
+ private static final TimestampReader INSTANCE = new TimestampReader();
+
+ @Override
+ public TimestampData nonNullRead(ColumnVector vector, int row) {
+ TimestampColumnVector tcv = (TimestampColumnVector) vector;
+ LocalDateTime localDate =
Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row])
+ .atOffset(ZoneOffset.UTC)
+ .toLocalDateTime();
+ return TimestampData.fromLocalDateTime(localDate);
+ }
+ }
+
+ private static class TimestampTzReader implements
OrcValueReader<TimestampData> {
+ private static final TimestampTzReader INSTANCE = new TimestampTzReader();
+
+ @Override
+ public TimestampData nonNullRead(ColumnVector vector, int row) {
+ TimestampColumnVector tcv = (TimestampColumnVector) vector;
+ Instant instant = Instant.ofEpochSecond(Math.floorDiv(tcv.time[row],
1_000), tcv.nanos[row])
+ .atOffset(ZoneOffset.UTC)
+ .toInstant();
+ return TimestampData.fromInstant(instant);
+ }
+ }
+
+ private static class ArrayReader<T> implements OrcValueReader<ArrayData> {
+ private final OrcValueReader<T> elementReader;
+
+ private ArrayReader(OrcValueReader<T> elementReader) {
+ this.elementReader = elementReader;
+ }
+
+ @Override
+ public ArrayData nonNullRead(ColumnVector vector, int row) {
+ ListColumnVector listVector = (ListColumnVector) vector;
+ int offset = (int) listVector.offsets[row];
+ int length = (int) listVector.lengths[row];
+ List<T> elements = Lists.newArrayListWithExpectedSize(length);
Review comment:
One way to reuse the ArrayList is: make it as a ThreadLocal<List<T>>,
then each thread will share the same List<T> instance. when reading
ArrayData, we get the ThreadLocal list and clear it (the array list's space
won't shrink and only free the references to elements). Then read values into
the list. One thing need to concern is the size of list: if we read an
ArrayData with many elements, then the ThreadLocal list may expand to a huge
list too, that would waste much memory. I did not get a good idea to handle
such case, I also see other orc readers are allocating the list, maybe we
could handle this in a separate issue.
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DataTest;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+public class TestFlinkOrcReaderWriter extends DataTest {
+ private static final int NUM_RECORDS = 200;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Override
+ protected void writeAndValidate(Schema schema) throws IOException {
Review comment:
You testing method is correct, but we don't have `assertEquals(schema,
genericRow, flinkRow)` before Junjie's parquet readers & writers patch get in.
So I changed to another way to verify the data:
1. generate `List<records>` by random generater;
2. convert the records to RowData list;
3. writer records from step1 to orc file, and reading them into RowData
list, and compare with RowData from step2;
4. write RowData from step2 into orc file, and reading them into records,
and compare with Records from step1.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]