wangwei1025 commented on a change in pull request #15746:
URL: https://github.com/apache/flink/pull/15746#discussion_r619948239



##########
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
##########
@@ -151,8 +160,114 @@ private static void setColumn(
                     vector.set(rowId, timestamp);
                     break;
                 }
+            case ARRAY:
+                {
+                    ListColumnVector listColumnVector = (ListColumnVector) 
column;
+                    setColumn(rowId, listColumnVector, type, row, columnId);
+                    break;
+                }
+            case MAP:
+                {
+                    MapColumnVector mapColumnVector = (MapColumnVector) column;
+                    setColumn(rowId, mapColumnVector, type, row, columnId);
+                    break;
+                }
+            case ROW:
+                {
+                    StructColumnVector structColumnVector = 
(StructColumnVector) column;
+                    setColumn(rowId, structColumnVector, type, row, columnId);
+                    break;
+                }
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
type);
         }
     }
+
+    private static void setColumn(
+            int rowId,
+            ListColumnVector listColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        ArrayData arrayData = row.getArray(columnId);
+        ArrayType arrayType = (ArrayType) type;
+        listColumnVector.lengths[rowId] = arrayData.size();
+        listColumnVector.offsets[rowId] = listColumnVector.childCount;
+        listColumnVector.childCount += listColumnVector.lengths[rowId];
+        listColumnVector.child.ensureSize(
+                listColumnVector.childCount, listColumnVector.offsets[rowId] 
!= 0);
+        for (int i = 0; i < arrayData.size(); i++) {
+            setColumn(
+                    (int) listColumnVector.offsets[rowId] + i,
+                    listColumnVector.child,
+                    arrayType.getElementType(),
+                    convert(arrayData, arrayType.getElementType()),
+                    i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            MapColumnVector mapColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        MapData mapData = row.getMap(columnId);
+        MapType mapType = (MapType) type;
+        ArrayData keyArray = mapData.keyArray();
+        ArrayData valueArray = mapData.valueArray();
+        mapColumnVector.lengths[rowId] = mapData.size();
+        mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
+        mapColumnVector.childCount += mapColumnVector.lengths[rowId];
+        mapColumnVector.keys.ensureSize(
+                mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 
0);
+        mapColumnVector.values.ensureSize(
+                mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 
0);
+
+        for (int i = 0; i < keyArray.size(); i++) {
+            setColumn(
+                    (int) mapColumnVector.offsets[rowId] + i,
+                    mapColumnVector.keys,
+                    mapType.getKeyType(),
+                    convert(keyArray, mapType.getKeyType()),
+                    i);
+            setColumn(
+                    (int) mapColumnVector.offsets[rowId] + i,
+                    mapColumnVector.values,
+                    mapType.getValueType(),
+                    convert(valueArray, mapType.getValueType()),
+                    i);
+        }
+    }
+
+    private static void setColumn(
+            int rowId,
+            StructColumnVector structColumnVector,
+            LogicalType type,
+            RowData row,
+            int columnId) {
+        RowData structRow = row.getRow(columnId, 
structColumnVector.fields.length);
+        RowType rowType = (RowType) type;
+        for (int i = 0; i < structRow.getArity(); i++) {
+            ColumnVector cv = structColumnVector.fields[i];
+            setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
+        }
+    }
+
+    /**
+     * Converting ArrayData to RowData for calling {@link 
RowDataVectorizer#setColumn(int,
+     * ColumnVector, LogicalType, RowData, int)} recursively with array.
+     *
+     * @param arrayData input ArrayData.
+     * @param arrayFieldType LogicalType of input ArrayData.
+     * @return RowData.
+     */
+    private static RowData convert(ArrayData arrayData, LogicalType 
arrayFieldType) {

Review comment:
       OK, I append a comment to explain the RowDataVectorizer.convert 
function. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to