This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c33e1ad [FLINK-19992][hive] Integrate new orc to Hive source
c33e1ad is described below
commit c33e1adbd401a2030d51863c940b8822f06005e5
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Nov 7 10:24:02 2020 +0800
[FLINK-19992][hive] Integrate new orc to Hive source
This closes #13939
---
.../hive/read/HiveBulkFormatAdapter.java | 79 ++++++++++++++----
.../nohive/OrcNoHiveColumnarRowInputFormat.java | 93 ++++++++++++++++++++++
2 files changed, 158 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
index 8eeb5a4..f07f2f0 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
@@ -25,6 +25,9 @@ import
org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
+import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -45,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -65,6 +69,9 @@ public class HiveBulkFormatAdapter implements
BulkFormat<RowData, HiveSourceSpli
private static final String SCHEMA_EVOLUTION_COLUMNS =
"schema.evolution.columns";
private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES =
"schema.evolution.columns.types";
+ private static final PartitionFieldExtractor<HiveSourceSplit>
PARTITION_FIELD_EXTRACTOR =
+ (split, fieldName, fieldType) ->
split.getHiveTablePartition().getPartitionSpec().get(fieldName);
+
private final JobConfWrapper jobConfWrapper;
private final List<String> partitionKeys;
private final String[] fieldNames;
@@ -107,24 +114,68 @@ public class HiveBulkFormatAdapter implements
BulkFormat<RowData, HiveSourceSpli
return InternalTypeInfo.of(producedRowType);
}
+ private RowType tableRowType() {
+ LogicalType[] types =
Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);
+ return RowType.of(types, fieldNames);
+ }
+
private BulkFormat<RowData, ? super HiveSourceSplit>
createBulkFormatForSplit(HiveSourceSplit split) {
if (!useMapRedReader &&
useParquetVectorizedRead(split.getHiveTablePartition())) {
- PartitionFieldExtractor<HiveSourceSplit> extractor =
(PartitionFieldExtractor<HiveSourceSplit>)
- (split1, fieldName, fieldType) ->
split1.getHiveTablePartition().getPartitionSpec().get(fieldName);
return
ParquetColumnarRowInputFormat.createPartitionedFormat(
jobConfWrapper.conf(),
producedRowType,
partitionKeys,
- extractor,
+ PARTITION_FIELD_EXTRACTOR,
DEFAULT_SIZE,
hiveVersion.startsWith("3"),
false
);
+ } else if (!useMapRedReader &&
useOrcVectorizedRead(split.getHiveTablePartition())) {
+ return createOrcFormat();
} else {
return new HiveMapRedBulkFormat();
}
}
+ private OrcColumnarRowFileInputFormat<?, HiveSourceSplit>
createOrcFormat() {
+ return hiveVersion.startsWith("1.") ?
+
OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
+ jobConfWrapper.conf(),
+ tableRowType(),
+ partitionKeys,
+ PARTITION_FIELD_EXTRACTOR,
+ computeSelectedFields(),
+ Collections.emptyList(),
+ DEFAULT_SIZE) :
+
OrcColumnarRowFileInputFormat.createPartitionedFormat(
+ OrcShim.createShim(hiveVersion),
+ jobConfWrapper.conf(),
+ tableRowType(),
+ partitionKeys,
+ PARTITION_FIELD_EXTRACTOR,
+ computeSelectedFields(),
+ Collections.emptyList(),
+ DEFAULT_SIZE);
+ }
+
+ private boolean useOrcVectorizedRead(HiveTablePartition partition) {
+ boolean isOrc =
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib()
+ .toLowerCase().contains("orc");
+ if (!isOrc) {
+ return false;
+ }
+
+ for (RowType.RowField field : producedRowType.getFields()) {
+ if (isVectorizationUnsupported(field.getType())) {
+ LOG.info("Fallback to hadoop mapred reader,
unsupported field type: " + field.getType());
+ return false;
+ }
+ }
+
+ LOG.info("Use flink orc ColumnarRowData reader.");
+ return true;
+ }
+
private boolean useParquetVectorizedRead(HiveTablePartition partition) {
boolean isParquet =
partition.getStorageDescriptor().getSerdeInfo().getSerializationLib()
.toLowerCase().contains("parquet");
@@ -278,18 +329,18 @@ public class HiveBulkFormatAdapter implements
BulkFormat<RowData, HiveSourceSpli
.collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIDs);
}
+ }
- // compute indices of selected fields according to the produced
type
- private int[] computeSelectedFields() {
- int[] selectedFields = new
int[producedRowType.getFieldCount()];
- for (int i = 0; i < selectedFields.length; i++) {
- String name =
producedRowType.getFieldNames().get(i);
- int index =
Arrays.asList(fieldNames).indexOf(name);
- Preconditions.checkState(index >= 0,
- "Produced field name %s not
found in table schema fields %s", name, Arrays.toString(fieldNames));
- selectedFields[i] = index;
- }
- return selectedFields;
+ // compute indices of selected fields according to the produced type
+ private int[] computeSelectedFields() {
+ int[] selectedFields = new int[producedRowType.getFieldCount()];
+ for (int i = 0; i < selectedFields.length; i++) {
+ String name = producedRowType.getFieldNames().get(i);
+ int index = Arrays.asList(fieldNames).indexOf(name);
+ Preconditions.checkState(index >= 0,
+ "Produced field name %s not found in
table schema fields %s", name, Arrays.toString(fieldNames));
+ selectedFields[i] = index;
}
+ return selectedFields;
}
}
diff --git
a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
new file mode 100644
index 0000000..2e187e2
--- /dev/null
+++
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.nohive;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.OrcFilters;
+import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
+import org.apache.flink.orc.vector.ColumnBatchFactory;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.filesystem.PartitionFieldExtractor;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.orc.OrcSplitReaderUtil.convertToOrcTypeWithPart;
+import static org.apache.flink.orc.OrcSplitReaderUtil.getNonPartNames;
+import static org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields;
+import static
org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
+import static
org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;
+
+/**
+ * Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive.
+ */
+public class OrcNoHiveColumnarRowInputFormat {
+ private OrcNoHiveColumnarRowInputFormat() {}
+
+ /**
+ * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the
partition columns can be
+ * generated by split.
+ */
+ public static <SplitT extends FileSourceSplit>
OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
+ Configuration hadoopConfig,
+ RowType tableType,
+ List<String> partitionKeys,
+ PartitionFieldExtractor<SplitT> extractor,
+ int[] selectedFields,
+ List<OrcFilters.Predicate> conjunctPredicates,
+ int batchSize) {
+ String[] tableFieldNames =
tableType.getFieldNames().toArray(new String[0]);
+ LogicalType[] tableFieldTypes =
tableType.getChildren().toArray(new LogicalType[0]);
+ List<String> orcFieldNames = getNonPartNames(tableFieldNames,
partitionKeys);
+ int[] orcSelectedFields = getSelectedOrcFields(tableFieldNames,
selectedFields, orcFieldNames);
+
+ ColumnBatchFactory<VectorizedRowBatch, SplitT> batchGenerator =
(SplitT split, VectorizedRowBatch rowBatch) -> {
+ // create and initialize the row batch
+ ColumnVector[] vectors = new
ColumnVector[selectedFields.length];
+ for (int i = 0; i < vectors.length; i++) {
+ String name =
tableFieldNames[selectedFields[i]];
+ LogicalType type =
tableFieldTypes[selectedFields[i]];
+ vectors[i] = partitionKeys.contains(name) ?
+ createFlinkVectorFromConstant(
+ type,
extractor.extract(split, name, type), batchSize) :
+
createFlinkVector(rowBatch.cols[orcFieldNames.indexOf(name)]);
+ }
+ return new VectorizedColumnBatch(vectors);
+ };
+
+ return new OrcColumnarRowFileInputFormat<>(
+ new OrcNoHiveShim(),
+ hadoopConfig,
+ convertToOrcTypeWithPart(tableFieldNames,
tableFieldTypes, partitionKeys),
+ orcSelectedFields,
+ conjunctPredicates,
+ batchSize,
+ batchGenerator,
+ new
RowType(Arrays.stream(selectedFields).mapToObj(i ->
+
tableType.getFields().get(i)).collect(Collectors.toList())));
+ }
+}