This is an automated email from the ASF dual-hosted git repository.

lirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 86da866  [FLINK-17783][orc] Add array,map,row types support for orc 
row writer
86da866 is described below

commit 86da8660b4d732c34c9a726094acb602b4eff503
Author: wangwei1025 <[email protected]>
AuthorDate: Sun Apr 25 14:14:21 2021 +0800

    [FLINK-17783][orc] Add array,map,row types support for orc row writer
    
    This closes #15746
---
 .../apache/flink/orc/vector/RowDataVectorizer.java | 119 ++++++++
 .../flink/orc/writer/OrcBulkRowDataWriterTest.java | 298 +++++++++++++++++++++
 2 files changed, 417 insertions(+)

diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
index 932051d..34d2d46 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
@@ -18,10 +18,16 @@
 
 package org.apache.flink.orc.vector;
 
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -29,7 +35,10 @@ import 
org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
@@ -151,8 +160,118 @@ public class RowDataVectorizer extends 
Vectorizer<RowData> {
                     vector.set(rowId, timestamp);
                     break;
                 }
+            case ARRAY:
+                {
+                    ListColumnVector listColumnVector = (ListColumnVector) 
column;
+                    setColumn(rowId, listColumnVector, type, row, columnId);
+                    break;
+                }
+            case MAP:
+                {
+                    MapColumnVector mapColumnVector = (MapColumnVector) column;
+                    setColumn(rowId, mapColumnVector, type, row, columnId);
+                    break;
+                }
+            case ROW:
+                {
+                    StructColumnVector structColumnVector = 
(StructColumnVector) column;
+                    setColumn(rowId, structColumnVector, type, row, columnId);
+                    break;
+                }
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
type);
         }
     }
+
+    private static void setColumn(
+            int rowId,
+            ListColumnVector listColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        ArrayData arrayData = row.getArray(columnId);
+        ArrayType arrayType = (ArrayType) type;
+        listColumnVector.lengths[rowId] = arrayData.size();
+        listColumnVector.offsets[rowId] = listColumnVector.childCount;
+        listColumnVector.childCount += listColumnVector.lengths[rowId];
+        listColumnVector.child.ensureSize(
+                listColumnVector.childCount, listColumnVector.offsets[rowId] 
!= 0);
+
+        RowData convertedRowData = convert(arrayData, 
arrayType.getElementType());
+        for (int i = 0; i < arrayData.size(); i++) {
+            setColumn(
+                    (int) listColumnVector.offsets[rowId] + i,
+                    listColumnVector.child,
+                    arrayType.getElementType(),
+                    convertedRowData,
+                    i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            MapColumnVector mapColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        MapData mapData = row.getMap(columnId);
+        MapType mapType = (MapType) type;
+        ArrayData keyArray = mapData.keyArray();
+        ArrayData valueArray = mapData.valueArray();
+        mapColumnVector.lengths[rowId] = mapData.size();
+        mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
+        mapColumnVector.childCount += mapColumnVector.lengths[rowId];
+        mapColumnVector.keys.ensureSize(
+                mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 
0);
+        mapColumnVector.values.ensureSize(
+                mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 
0);
+
+        RowData convertedKeyRowData = convert(keyArray, mapType.getKeyType());
+        RowData convertedValueRowData = convert(valueArray, 
mapType.getValueType());
+        for (int i = 0; i < keyArray.size(); i++) {
+            setColumn(
+                    (int) mapColumnVector.offsets[rowId] + i,
+                    mapColumnVector.keys,
+                    mapType.getKeyType(),
+                    convertedKeyRowData,
+                    i);
+            setColumn(
+                    (int) mapColumnVector.offsets[rowId] + i,
+                    mapColumnVector.values,
+                    mapType.getValueType(),
+                    convertedValueRowData,
+                    i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            StructColumnVector structColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        RowData structRow = row.getRow(columnId, 
structColumnVector.fields.length);
+        RowType rowType = (RowType) type;
+        for (int i = 0; i < structRow.getArity(); i++) {
+            ColumnVector cv = structColumnVector.fields[i];
+            setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
+        }
+    }
+
+    /**
+     * Converting ArrayData to RowData for calling {@link 
RowDataVectorizer#setColumn(int,
+     * ColumnVector, LogicalType, RowData, int)} recursively with array.
+     *
+     * @param arrayData input ArrayData.
+     * @param arrayFieldType LogicalType of input ArrayData.
+     * @return RowData.
+     */
+    private static RowData convert(ArrayData arrayData, LogicalType 
arrayFieldType) {
+        GenericRowData rowData = new GenericRowData(arrayData.size());
+        ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(arrayFieldType);
+        for (int i = 0; i < arrayData.size(); i++) {
+            rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
+        }
+        return rowData;
+    }
 }
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkRowDataWriterTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkRowDataWriterTest.java
new file mode 100644
index 0000000..6ffa51e
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkRowDataWriterTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.vector.RowDataVectorizer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/** Unit test for the ORC BulkWriter write RowData with nested type. */
+public class OrcBulkRowDataWriterTest {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @SuppressWarnings("FieldCanBeLocal")
+    private String schema =
+            
"struct<_col0:string,_col1:int,_col2:array<struct<_col2_col0:string>>,"
+                    + 
"_col3:map<string,struct<_col3_col0:string,_col3_col1:timestamp>>>";
+
+    private LogicalType[] fieldTypes;
+    private List<RowData> input;
+
+    @Test
+    public void testOrcBulkWriterWithRowData() throws Exception {
+        final File outDir = TEMPORARY_FOLDER.newFolder();
+        final Properties writerProps = new Properties();
+        writerProps.setProperty("orc.compress", "LZ4");
+
+        final OrcBulkWriterFactory<RowData> writer =
+                new OrcBulkWriterFactory<>(
+                        new RowDataVectorizer(schema, fieldTypes),
+                        writerProps,
+                        new Configuration());
+
+        StreamingFileSink<RowData> sink =
+                StreamingFileSink.forBulkFormat(new Path(outDir.toURI()), 
writer)
+                        .withBucketCheckInterval(10000)
+                        .build();
+
+        try (OneInputStreamOperatorTestHarness<RowData, Object> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 1, 1, 0)) {
+
+            testHarness.setup();
+            testHarness.open();
+
+            int time = 0;
+            for (final RowData record : input) {
+                testHarness.processElement(record, ++time);
+            }
+
+            testHarness.snapshot(1, ++time);
+            testHarness.notifyOfCompletedCheckpoint(1);
+
+            validate(outDir, input);
+        }
+    }
+
+    @Before
+    public void initInput() {
+        input = new ArrayList<>();
+        fieldTypes = new LogicalType[4];
+        fieldTypes[0] = new VarCharType();
+        fieldTypes[1] = new IntType();
+        List<RowType.RowField> arrayRowFieldList =
+                Collections.singletonList(new RowType.RowField("_col2_col0", 
new VarCharType()));
+        fieldTypes[2] = new ArrayType(new RowType(arrayRowFieldList));
+        List<RowType.RowField> mapRowFieldList =
+                Arrays.asList(
+                        new RowType.RowField("_col3_col0", new VarCharType()),
+                        new RowType.RowField("_col3_col1", new 
TimestampType()));
+        fieldTypes[3] = new MapType(new VarCharType(), new 
RowType(mapRowFieldList));
+
+        {
+            GenericRowData rowData = new GenericRowData(4);
+
+            rowData.setField(0, new BinaryStringData("_col_0_string_1"));
+
+            rowData.setField(1, 1);
+
+            GenericRowData arrayValue1 = new GenericRowData(1);
+            arrayValue1.setField(0, new 
BinaryStringData("_col_2_row_0_string_1"));
+            GenericRowData arrayValue2 = new GenericRowData(1);
+            arrayValue2.setField(0, new 
BinaryStringData("_col_2_row_1_string_1"));
+            GenericArrayData arrayData =
+                    new GenericArrayData(new Object[] {arrayValue1, 
arrayValue2});
+            rowData.setField(2, arrayData);
+
+            GenericRowData mapValue1 = new GenericRowData(2);
+            mapValue1.setField(0, new 
BinaryStringData(("_col_3_map_value_string_1")));
+            mapValue1.setField(1, TimestampData.fromTimestamp(new 
Timestamp(3600000)));
+            Map<StringData, RowData> mapDataMap = new HashMap<>();
+            mapDataMap.put(new BinaryStringData("_col_3_map_key_1"), 
mapValue1);
+            GenericMapData mapData = new GenericMapData(mapDataMap);
+            rowData.setField(3, mapData);
+
+            input.add(rowData);
+        }
+
+        {
+            GenericRowData rowData = new GenericRowData(4);
+
+            rowData.setField(0, new BinaryStringData("_col_0_string_2"));
+
+            rowData.setField(1, 2);
+
+            GenericRowData arrayValue1 = new GenericRowData(1);
+            arrayValue1.setField(0, new 
BinaryStringData("_col_2_row_0_string_2"));
+            GenericRowData arrayValue2 = new GenericRowData(1);
+            arrayValue2.setField(0, new 
BinaryStringData("_col_2_row_1_string_2"));
+            GenericArrayData arrayData =
+                    new GenericArrayData(new Object[] {arrayValue1, 
arrayValue2});
+            rowData.setField(2, arrayData);
+
+            GenericRowData mapValue1 = new GenericRowData(2);
+            mapValue1.setField(0, new 
BinaryStringData(("_col_3_map_value_string_2")));
+            mapValue1.setField(1, TimestampData.fromTimestamp(new 
Timestamp(3600000)));
+            Map<StringData, RowData> mapDataMap = new HashMap<>();
+            mapDataMap.put(new BinaryStringData("_col_3_map_key_2"), 
mapValue1);
+            GenericMapData mapData = new GenericMapData(mapDataMap);
+            rowData.setField(3, mapData);
+            input.add(rowData);
+        }
+    }
+
+    private void validate(File files, List<RowData> expected) throws 
IOException {
+        final File[] buckets = files.listFiles();
+        assertNotNull(buckets);
+        assertEquals(1, buckets.length);
+
+        final File[] partFiles = buckets[0].listFiles();
+        assertNotNull(partFiles);
+
+        for (File partFile : partFiles) {
+            assertTrue(partFile.length() > 0);
+
+            OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new 
Configuration());
+            Reader reader =
+                    OrcFile.createReader(
+                            new org.apache.hadoop.fs.Path(partFile.toURI()), 
readerOptions);
+
+            assertEquals(2, reader.getNumberOfRows());
+            assertEquals(4, reader.getSchema().getFieldNames().size());
+            assertSame(reader.getCompressionKind(), CompressionKind.LZ4);
+
+            List<RowData> results = getResults(reader);
+
+            assertEquals(2, results.size());
+            assertEquals(results, expected);
+        }
+    }
+
+    private static List<RowData> getResults(Reader reader) throws IOException {
+        List<RowData> results = new ArrayList<>();
+
+        RecordReader recordReader = reader.rows();
+        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+
+        while (recordReader.nextBatch(batch)) {
+            BytesColumnVector stringVector = (BytesColumnVector) batch.cols[0];
+            LongColumnVector intVector = (LongColumnVector) batch.cols[1];
+            ListColumnVector listVector = (ListColumnVector) batch.cols[2];
+            MapColumnVector mapVector = (MapColumnVector) batch.cols[3];
+
+            for (int r = 0; r < batch.size; r++) {
+                GenericRowData readRowData = new GenericRowData(4);
+
+                readRowData.setField(0, readStringData(stringVector, r));
+                readRowData.setField(1, readInt(intVector, r));
+                readRowData.setField(2, readList(listVector, r));
+                readRowData.setField(3, readMap(mapVector, r));
+
+                results.add(readRowData);
+            }
+            recordReader.close();
+        }
+
+        return results;
+    }
+
+    private static StringData readStringData(BytesColumnVector stringVector, 
int row) {
+        return new BinaryStringData(
+                new String(
+                        stringVector.vector[row],
+                        stringVector.start[row],
+                        stringVector.length[row]));
+    }
+
+    private static int readInt(LongColumnVector intVector, int row) {
+        return (int) intVector.vector[row];
+    }
+
+    /** Read ListColumnVector with specify schema {@literal 
array<struct<_col2_col0:string>>}. */
+    private static ArrayData readList(ListColumnVector listVector, int row) {
+        int offset = (int) listVector.offsets[row];
+        StructColumnVector structChild = (StructColumnVector) listVector.child;
+        BytesColumnVector valueChild = (BytesColumnVector) 
structChild.fields[0];
+
+        StringData value1 = readStringData(valueChild, offset);
+        GenericRowData arrayValue1 = new GenericRowData(1);
+        arrayValue1.setField(0, value1);
+
+        StringData value2 = readStringData(valueChild, offset + 1);
+        GenericRowData arrayValue2 = new GenericRowData(1);
+        arrayValue2.setField(0, (value2));
+
+        return new GenericArrayData(new Object[] {arrayValue1, arrayValue2});
+    }
+
+    /**
+     * Read MapColumnVector with specify schema {@literal
+     * map<string,struct<_col3_col0:string,_col3_col1:timestamp>>}.
+     */
+    private static MapData readMap(MapColumnVector mapVector, int row) {
+        int offset = (int) mapVector.offsets[row];
+        StringData keyData = readStringData((BytesColumnVector) 
mapVector.keys, offset);
+        GenericRowData valueData = new GenericRowData(2);
+        StructColumnVector structVector = (StructColumnVector) 
mapVector.values;
+        BytesColumnVector bytesVector = (BytesColumnVector) 
structVector.fields[0];
+        TimestampColumnVector timestampVector = (TimestampColumnVector) 
structVector.fields[1];
+        StringData strValueData = readStringData(bytesVector, offset);
+        TimestampData timestampData = readTimestamp(timestampVector, offset);
+        valueData.setField(0, strValueData);
+        valueData.setField(1, timestampData);
+        Map<StringData, RowData> mapDataMap = new HashMap<>();
+        mapDataMap.put(keyData, valueData);
+        return new GenericMapData(mapDataMap);
+    }
+
+    private static TimestampData readTimestamp(TimestampColumnVector 
timestampVector, int row) {
+        return 
TimestampData.fromTimestamp(timestampVector.asScratchTimestamp(row));
+    }
+}

Reply via email to