This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c33e1ad  [FLINK-19992][hive] Integrate new orc to Hive source
c33e1ad is described below

commit c33e1adbd401a2030d51863c940b8822f06005e5
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Nov 7 10:24:02 2020 +0800

    [FLINK-19992][hive] Integrate new orc to Hive source
    
    This closes #13939
---
 .../hive/read/HiveBulkFormatAdapter.java           | 79 ++++++++++++++----
 .../nohive/OrcNoHiveColumnarRowInputFormat.java    | 93 ++++++++++++++++++++++
 2 files changed, 158 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
index 8eeb5a4..f07f2f0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
@@ -25,6 +25,9 @@ import 
org.apache.flink.connector.file.src.util.ArrayResultIterator;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
+import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -45,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -65,6 +69,9 @@ public class HiveBulkFormatAdapter implements 
BulkFormat<RowData, HiveSourceSpli
        private static final String SCHEMA_EVOLUTION_COLUMNS = 
"schema.evolution.columns";
        private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = 
"schema.evolution.columns.types";
 
+       private static final PartitionFieldExtractor<HiveSourceSplit> 
PARTITION_FIELD_EXTRACTOR =
+                       (split, fieldName, fieldType) -> 
split.getHiveTablePartition().getPartitionSpec().get(fieldName);
+
        private final JobConfWrapper jobConfWrapper;
        private final List<String> partitionKeys;
        private final String[] fieldNames;
@@ -107,24 +114,68 @@ public class HiveBulkFormatAdapter implements 
BulkFormat<RowData, HiveSourceSpli
                return InternalTypeInfo.of(producedRowType);
        }
 
+       private RowType tableRowType() {
+               LogicalType[] types = 
Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);
+               return RowType.of(types, fieldNames);
+       }
+
        private BulkFormat<RowData, ? super HiveSourceSplit> 
createBulkFormatForSplit(HiveSourceSplit split) {
                if (!useMapRedReader && 
useParquetVectorizedRead(split.getHiveTablePartition())) {
-                       PartitionFieldExtractor<HiveSourceSplit> extractor = 
(PartitionFieldExtractor<HiveSourceSplit>)
-                                       (split1, fieldName, fieldType) -> 
split1.getHiveTablePartition().getPartitionSpec().get(fieldName);
                        return 
ParquetColumnarRowInputFormat.createPartitionedFormat(
                                        jobConfWrapper.conf(),
                                        producedRowType,
                                        partitionKeys,
-                                       extractor,
+                                       PARTITION_FIELD_EXTRACTOR,
                                        DEFAULT_SIZE,
                                        hiveVersion.startsWith("3"),
                                        false
                        );
+               } else if (!useMapRedReader && 
useOrcVectorizedRead(split.getHiveTablePartition())) {
+                       return createOrcFormat();
                } else {
                        return new HiveMapRedBulkFormat();
                }
        }
 
+       private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> 
createOrcFormat() {
+               return hiveVersion.startsWith("1.") ?
+                               
OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
+                                               jobConfWrapper.conf(),
+                                               tableRowType(),
+                                               partitionKeys,
+                                               PARTITION_FIELD_EXTRACTOR,
+                                               computeSelectedFields(),
+                                               Collections.emptyList(),
+                                               DEFAULT_SIZE) :
+                               
OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                                               OrcShim.createShim(hiveVersion),
+                                               jobConfWrapper.conf(),
+                                               tableRowType(),
+                                               partitionKeys,
+                                               PARTITION_FIELD_EXTRACTOR,
+                                               computeSelectedFields(),
+                                               Collections.emptyList(),
+                                               DEFAULT_SIZE);
+       }
+
+       private boolean useOrcVectorizedRead(HiveTablePartition partition) {
+               boolean isOrc = 
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib()
+                               .toLowerCase().contains("orc");
+               if (!isOrc) {
+                       return false;
+               }
+
+               for (RowType.RowField field : producedRowType.getFields()) {
+                       if (isVectorizationUnsupported(field.getType())) {
+                               LOG.info("Fallback to hadoop mapred reader, 
unsupported field type: " + field.getType());
+                               return false;
+                       }
+               }
+
+               LOG.info("Use flink orc ColumnarRowData reader.");
+               return true;
+       }
+
        private boolean useParquetVectorizedRead(HiveTablePartition partition) {
                boolean isParquet = 
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib()
                                .toLowerCase().contains("parquet");
@@ -278,18 +329,18 @@ public class HiveBulkFormatAdapter implements 
BulkFormat<RowData, HiveSourceSpli
                                        .collect(Collectors.joining(","));
                        
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIDs);
                }
+       }
 
-               // compute indices of selected fields according to the produced 
type
-               private int[] computeSelectedFields() {
-                       int[] selectedFields = new 
int[producedRowType.getFieldCount()];
-                       for (int i = 0; i < selectedFields.length; i++) {
-                               String name = 
producedRowType.getFieldNames().get(i);
-                               int index = 
Arrays.asList(fieldNames).indexOf(name);
-                               Preconditions.checkState(index >= 0,
-                                               "Produced field name %s not 
found in table schema fields %s", name, Arrays.toString(fieldNames));
-                               selectedFields[i] = index;
-                       }
-                       return selectedFields;
+       // compute indices of selected fields according to the produced type
+       private int[] computeSelectedFields() {
+               int[] selectedFields = new int[producedRowType.getFieldCount()];
+               for (int i = 0; i < selectedFields.length; i++) {
+                       String name = producedRowType.getFieldNames().get(i);
+                       int index = Arrays.asList(fieldNames).indexOf(name);
+                       Preconditions.checkState(index >= 0,
+                                       "Produced field name %s not found in 
table schema fields %s", name, Arrays.toString(fieldNames));
+                       selectedFields[i] = index;
                }
+               return selectedFields;
        }
 }
diff --git 
a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
 
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
new file mode 100644
index 0000000..2e187e2
--- /dev/null
+++ 
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.nohive;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
+import org.apache.flink.orc.vector.ColumnBatchFactory;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.filesystem.PartitionFieldExtractor;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.orc.OrcSplitReaderUtil.convertToOrcTypeWithPart;
+import static org.apache.flink.orc.OrcSplitReaderUtil.getNonPartNames;
+import static org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields;
+import static 
org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
+import static 
org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;
+
+/**
+ * Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive.
+ */
+public class OrcNoHiveColumnarRowInputFormat {
+       private OrcNoHiveColumnarRowInputFormat() {}
+
+       /**
+        * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the 
partition columns can be
+        * generated by split.
+        */
+       public static <SplitT extends FileSourceSplit> 
OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> 
createPartitionedFormat(
+                       Configuration hadoopConfig,
+                       RowType tableType,
+                       List<String> partitionKeys,
+                       PartitionFieldExtractor<SplitT> extractor,
+                       int[] selectedFields,
+                       List<OrcFilters.Predicate> conjunctPredicates,
+                       int batchSize) {
+               String[] tableFieldNames = 
tableType.getFieldNames().toArray(new String[0]);
+               LogicalType[] tableFieldTypes = 
tableType.getChildren().toArray(new LogicalType[0]);
+               List<String> orcFieldNames = getNonPartNames(tableFieldNames, 
partitionKeys);
+               int[] orcSelectedFields = getSelectedOrcFields(tableFieldNames, 
selectedFields, orcFieldNames);
+
+               ColumnBatchFactory<VectorizedRowBatch, SplitT> batchGenerator = 
(SplitT split, VectorizedRowBatch rowBatch) -> {
+                       // create and initialize the row batch
+                       ColumnVector[] vectors = new 
ColumnVector[selectedFields.length];
+                       for (int i = 0; i < vectors.length; i++) {
+                               String name = 
tableFieldNames[selectedFields[i]];
+                               LogicalType type = 
tableFieldTypes[selectedFields[i]];
+                               vectors[i] = partitionKeys.contains(name) ?
+                                               createFlinkVectorFromConstant(
+                                                               type, 
extractor.extract(split, name, type), batchSize) :
+                                               
createFlinkVector(rowBatch.cols[orcFieldNames.indexOf(name)]);
+                       }
+                       return new VectorizedColumnBatch(vectors);
+               };
+
+               return new OrcColumnarRowFileInputFormat<>(
+                               new OrcNoHiveShim(),
+                               hadoopConfig,
+                               convertToOrcTypeWithPart(tableFieldNames, 
tableFieldTypes, partitionKeys),
+                               orcSelectedFields,
+                               conjunctPredicates,
+                               batchSize,
+                               batchGenerator,
+                               new 
RowType(Arrays.stream(selectedFields).mapToObj(i ->
+                                               
tableType.getFields().get(i)).collect(Collectors.toList())));
+       }
+}

Reply via email to