garyli1019 commented on a change in pull request #2613: URL: https://github.com/apache/hudi/pull/2613#discussion_r585502738
########## File path: hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamWriteOperatorFactory; +import org.apache.hudi.operator.compact.CompactFunction; +import org.apache.hudi.operator.compact.CompactionCommitEvent; +import org.apache.hudi.operator.compact.CompactionCommitSink; +import org.apache.hudi.operator.compact.CompactionPlanEvent; +import org.apache.hudi.operator.compact.CompactionPlanOperator; +import org.apache.hudi.operator.partitioner.BucketAssignFunction; +import org.apache.hudi.operator.transform.RowDataToHoodieFunction; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.PartitionableTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Map; + +/** + * Hoodie table sink. + */ +public class HoodieTableSink implements AppendStreamTableSink<RowData>, PartitionableTableSink { Review comment: Is this for the batch job? ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.HoodieROTablePathFilter; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.cow.CopyOnWriteInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.source.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.sources.FilterableTableSource; +import org.apache.flink.table.sources.LimitableTableSource; +import org.apache.flink.table.sources.PartitionableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes; +import static org.apache.hudi.source.format.FormatUtils.getParquetConf; + +/** + * Hoodie batch table source that always read the latest snapshot of the underneath table. + */ +public class HoodieTableSource implements + StreamTableSource<RowData>, + PartitionableTableSource, + ProjectableTableSource<RowData>, + LimitableTableSource<RowData>, + FilterableTableSource<RowData> { + private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class); + + private final transient org.apache.hadoop.conf.Configuration hadoopConf; + private final transient HoodieTableMetaClient metaClient; + private final long maxCompactionMemoryInBytes; + + private final TableSchema schema; + private final Path path; + private final List<String> partitionKeys; + private final String defaultPartName; + private final Configuration conf; + + private final int[] requiredPos; + private final long limit; + private final List<Expression> filters; + + private List<Map<String, String>> requiredPartitions; + + public HoodieTableSource( + TableSchema schema, Review comment: Does the user has to define the schema when using this? In Spark, we use the TableSchemaResolver to get the latest schema from the FileSystem ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/format/cow/AbstractColumnReader.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format.cow; + +import org.apache.flink.formats.parquet.vector.ParquetDictionary; +import org.apache.flink.formats.parquet.vector.reader.ColumnReader; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; + +/** + * Abstract {@link ColumnReader}. + * See {@link org.apache.parquet.column.impl.ColumnReaderImpl}, + * part of the code is referred from Apache Spark and Apache Parquet. + * + * <p>Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader} + * because some of the package scope methods. + */ +public abstract class AbstractColumnReader<V extends WritableColumnVector> Review comment: Why do we need a separate reader. Does Flink have its own one for parquet? ########## File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java ########## @@ -73,11 +118,13 @@ private FlinkOptions() { .noDefaultValue() .withDescription("Table name to register to Hive metastore"); + public static final String TABLE_TYPE_COPY_ON_WRITE = "COPY_ON_WRITE"; Review comment: can we link the String to `HoodieTableType`? ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java ########## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.HoodieROTablePathFilter; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.cow.CopyOnWriteInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.source.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.FilePathFilter; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.CollectionInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.sources.FilterableTableSource; +import org.apache.flink.table.sources.LimitableTableSource; +import org.apache.flink.table.sources.PartitionableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes; +import static org.apache.hudi.source.format.FormatUtils.getParquetConf; + +/** + * Hoodie batch table source that always read the latest snapshot of the underneath table. + */ +public class HoodieTableSource implements + StreamTableSource<RowData>, + PartitionableTableSource, + ProjectableTableSource<RowData>, + LimitableTableSource<RowData>, + FilterableTableSource<RowData> { + private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class); + + private static final int NO_LIMIT_CONSTANT = -1; + + private final transient org.apache.hadoop.conf.Configuration hadoopConf; + private final transient HoodieTableMetaClient metaClient; + private final long maxCompactionMemoryInBytes; + + private final TableSchema schema; + private final Path path; + private final List<String> partitionKeys; + private final String defaultPartName; + private final Configuration conf; + + private final int[] requiredPos; + private final long limit; + private final List<Expression> filters; + + private List<Map<String, String>> requiredPartitions; + + public HoodieTableSource( + TableSchema schema, + Path path, + List<String> partitionKeys, + String defaultPartName, + Configuration conf) { + this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null); + } + + public HoodieTableSource( + TableSchema schema, + Path path, + List<String> partitionKeys, + String defaultPartName, + Configuration conf, + @Nullable List<Map<String, String>> requiredPartitions, + @Nullable int[] requiredPos, + @Nullable Long limit, + @Nullable List<Expression> filters) { + this.schema = schema; + this.path = path; + this.partitionKeys = partitionKeys; + this.defaultPartName = defaultPartName; + this.conf = conf; + this.requiredPartitions = requiredPartitions; + this.requiredPos = requiredPos == null + ? IntStream.range(0, schema.getFieldCount()).toArray() + : requiredPos; + this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; + this.filters = filters == null ? Collections.emptyList() : filters; + final String basePath = this.conf.getString(FlinkOptions.PATH); + this.hadoopConf = StreamerUtil.getHadoopConf(); + this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf)); + } + + @Override + public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { + @SuppressWarnings("unchecked") + TypeInformation<RowData> typeInfo = + (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); + InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); + DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo); + return source.name(explainSource()); + } + + @Override + public boolean isBounded() { + return true; + } + + @Override + public TableSource<RowData> applyPredicate(List<Expression> predicates) { + return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, + requiredPartitions, requiredPos, limit, new ArrayList<>(predicates)); + } + + @Override + public boolean isFilterPushedDown() { + return this.filters != null && this.filters.size() > 0; + } + + @Override + public boolean isLimitPushedDown() { + return this.limit != NO_LIMIT_CONSTANT; + } + + @Override + public TableSource<RowData> applyLimit(long limit) { + return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, + requiredPartitions, requiredPos, limit, filters); + } + + @Override + public List<Map<String, String>> getPartitions() { + try { + return FilePathUtils + .searchPartKeyValueAndPaths( + path.getFileSystem(), + path, + conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), + partitionKeys.toArray(new String[0])) + .stream() + .map(tuple2 -> tuple2.f0) + .map(spec -> { + LinkedHashMap<String, String> ret = new LinkedHashMap<>(); + spec.forEach((k, v) -> ret.put(k, defaultPartName.equals(v) ? null : v)); + return ret; + }) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new TableException("Fetch partitions fail.", e); + } + } + + @Override + public TableSource applyPartitionPruning(List<Map<String, String>> requiredPartitions) { + return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, + requiredPartitions, requiredPos, limit, filters); + } + + @Override + public TableSource<RowData> projectFields(int[] requiredPos) { + return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, conf, + requiredPartitions, requiredPos, limit, filters); + } + + @Override + public TableSchema getTableSchema() { + return schema; + } + + @Override + public DataType getProducedDataType() { + String[] schemaFieldNames = this.schema.getFieldNames(); + DataType[] schemaTypes = this.schema.getFieldDataTypes(); + + return DataTypes.ROW(Arrays.stream(this.requiredPos) + .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i])) + .toArray(DataTypes.Field[]::new)) + .bridgedTo(RowData.class); + } + + private List<Map<String, String>> getOrFetchPartitions() { + if (requiredPartitions == null) { + requiredPartitions = getPartitions(); + } + return requiredPartitions; + } + + private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) { + FileStatus[] fileStatuses = Arrays.stream(paths) + .flatMap(path -> Arrays.stream(FilePathUtils.getFileStatusRecursivelyV2(path, 1, hadoopConf))) + .toArray(FileStatus[]::new); + if (fileStatuses.length == 0) { + throw new HoodieException("No files found for reading in user provided path."); + } + + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants(), fileStatuses); + List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList()); + String latestCommit = fsView.getLastInstant().get().getTimestamp(); + final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE); + final AtomicInteger cnt = new AtomicInteger(0); + if (latestFiles.size() > 0) { + Map<HoodieBaseFile, List<String>> fileGroup = + HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(hadoopConf, latestFiles); + return fileGroup.entrySet().stream().map(kv -> { + HoodieBaseFile baseFile = kv.getKey(); + Option<List<String>> logPaths = kv.getValue().size() == 0 + ? Option.empty() + : Option.of(kv.getValue()); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), + baseFile.getPath(), logPaths, latestCommit, + metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType); + }).collect(Collectors.toList()); + } else { + // all the files are logs Review comment: how is this possible? ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/format/FormatUtils.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Utilities for format. + */ +public class FormatUtils { + private FormatUtils() { + } + + public static GenericRecord buildAvroRecordBySchema( + IndexedRecord record, + Schema requiredSchema, + int[] requiredPos, + GenericRecordBuilder recordBuilder) { + List<Schema.Field> requiredFields = requiredSchema.getFields(); + assert (requiredFields.size() == requiredPos.length); + Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator(); + requiredFields.forEach(f -> recordBuilder.set(f, record.get(positionIterator.next()))); + return recordBuilder.build(); + } + + public static HoodieMergedLogRecordScanner scanLog( + MergeOnReadInputSplit split, + Schema logSchema, + Configuration config) { + FileSystem fs = FSUtils.getFs(split.getTablePath(), config); + return HoodieMergedLogRecordScanner.newBuilder() Review comment: duplicate with spark code, maybe we can move this to a shared place? ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/format/cow/CopyOnWriteInputFormat.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format.cow; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.SerializableConfiguration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.PartitionPathUtils; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; +import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; + +/** + * An implementation of {@link FileInputFormat} to read {@link RowData} records + * from Parquet files. + * + * <p>Note: Reference Flink release 1.11.2 + * {@code org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory.ParquetInputFormat} + * to support TIMESTAMP_MILLIS. + * + * @see ParquetSplitReaderUtil + */ +public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { Review comment: Why not extends from `ParquetInputFormat`? ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/format/cow/ParquetColumnarRowSplitReader.java ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format.cow; + +import org.apache.flink.formats.parquet.vector.reader.ColumnReader; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.hudi.source.format.cow.ParquetSplitReaderUtil.createColumnReader; +import static org.apache.hudi.source.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; +import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; + +/** + * This reader is used to read a {@link VectorizedColumnBatch} from input split. + * + * <p>Note: Reference Flink release 1.11.2 + * {@code org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader} + * because it is package scope. + */ +public class ParquetColumnarRowSplitReader implements Closeable { Review comment: IMO we put too much dependency on the File Reader. It will be hard to maintain over time. I am more inclined to reuse the reader from the compute engine side. WDYT? ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java ########## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format; + +import org.apache.hudi.common.fs.FSUtils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableException; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference the Flink {@link org.apache.flink.table.utils.PartitionPathUtils} + * but supports simple partition path besides the Hive style. + */ +public class FilePathUtils { + + private static final Pattern HIVE_PARTITION_NAME_PATTERN = Pattern.compile("([^/]+)=([^/]+)"); + + private static final BitSet CHAR_TO_ESCAPE = new BitSet(128); + + static { + for (char c = 0; c < ' '; c++) { + CHAR_TO_ESCAPE.set(c); + } + + /* + * ASCII 01-1F are HTTP control characters that need to be escaped. + * \u000A and \u000D are \n and \r, respectively. + */ + char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004', + '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', '\n', '\u000B', + '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', + '\u0013', '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', + '\u001A', '\u001B', '\u001C', '\u001D', '\u001E', '\u001F', + '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{', + '[', ']', '^'}; + + for (char c : clist) { + CHAR_TO_ESCAPE.set(c); + } + } + + private static boolean needsEscaping(char c) { + return c < CHAR_TO_ESCAPE.size() && CHAR_TO_ESCAPE.get(c); + } + + /** + * Make partition path from partition spec. + * + * @param partitionKVs The partition key value mapping + * @param hiveStylePartition Whether the partition path is with Hive style, + * e.g. {partition key} = {partition value} + * @return an escaped, valid partition name + */ + public static String generatePartitionPath( + LinkedHashMap<String, String> partitionKVs, + boolean hiveStylePartition) { + if (partitionKVs.isEmpty()) { + return ""; + } + StringBuilder suffixBuf = new StringBuilder(); + int i = 0; + for (Map.Entry<String, String> e : partitionKVs.entrySet()) { + if (i > 0) { + suffixBuf.append(Path.SEPARATOR); + } + if (hiveStylePartition) { + suffixBuf.append(escapePathName(e.getKey())); + suffixBuf.append('='); + } + suffixBuf.append(escapePathName(e.getValue())); + i++; + } + suffixBuf.append(Path.SEPARATOR); + return suffixBuf.toString(); + } + + /** + * Escapes a path name. + * + * @param path The path to escape. + * @return An escaped path name. + */ + private static String escapePathName(String path) { + if (path == null || path.length() == 0) { + throw new TableException("Path should not be null or empty: " + path); + } + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (needsEscaping(c)) { + sb.append('%'); + sb.append(String.format("%1$02X", (int) c)); + } else { + sb.append(c); + } + } + return sb.toString(); + } + + /** + * Generates partition values from path. + * + * @param currPath Partition file path + * @param hivePartition Whether the partition path is with Hive style + * @param partitionKeys Partition keys + * @return Sequential partition specs. + */ + public static List<String> extractPartitionValues( + Path currPath, + boolean hivePartition, + String[] partitionKeys) { + return new ArrayList<>(extractPartitionKeyValues(currPath, hivePartition, partitionKeys).values()); + } + + /** + * Generates partition key value mapping from path. + * + * @param currPath Partition file path + * @param hivePartition Whether the partition path is with Hive style + * @param partitionKeys Partition keys + * @return Sequential partition specs. + */ + public static LinkedHashMap<String, String> extractPartitionKeyValues( + Path currPath, + boolean hivePartition, + String[] partitionKeys) { + LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>(); + List<String[]> kvs = new ArrayList<>(); + int curDepth = 0; + do { + String component = currPath.getName(); + final String[] kv = new String[2]; + if (hivePartition) { + Matcher m = HIVE_PARTITION_NAME_PATTERN.matcher(component); + if (m.matches()) { + String k = unescapePathName(m.group(1)); + String v = unescapePathName(m.group(2)); + kv[0] = k; + kv[1] = v; + } + } else { + kv[0] = partitionKeys[partitionKeys.length - 1 - curDepth]; + kv[1] = unescapePathName(component); + } + kvs.add(kv); + currPath = currPath.getParent(); + curDepth++; + } while (currPath != null && !currPath.getName().isEmpty() && curDepth < partitionKeys.length); + + // reverse the list since we checked the part from leaf dir to table's base dir + for (int i = kvs.size(); i > 0; i--) { + fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]); + } + + return fullPartSpec; + } + + public static String unescapePathName(String path) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (c == '%' && i + 2 < path.length()) { + int code = -1; + try { + code = Integer.parseInt(path.substring(i + 1, i + 3), 16); + } catch (Exception ignored) { + // do nothing + } + if (code >= 0) { + sb.append((char) code); + i += 2; + continue; + } + } + sb.append(c); + } + return sb.toString(); + } + + /** + * Search all partitions in this path. + * + * @param fs File system + * @param path Search path + * @param hivePartition Whether the partition path is with Hive style + * @param partitionKeys Partition keys + * @return all partition key value mapping in sequence of the given path + */ + public static List<Tuple2<LinkedHashMap<String, String>, Path>> searchPartKeyValueAndPaths( + FileSystem fs, + Path path, + boolean hivePartition, + String[] partitionKeys) { + // expectLevel start from 0, E.G. base_path/level0/level1/level2 + FileStatus[] generatedParts = getFileStatusRecursively(path, partitionKeys.length, fs); + List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList<>(); + for (FileStatus part : generatedParts) { + ret.add( + new Tuple2<>( + extractPartitionKeyValues(part.getPath(), hivePartition, partitionKeys), + part.getPath())); + } + return ret; + } + + private static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) { + ArrayList<FileStatus> result = new ArrayList<>(); + + try { + FileStatus fileStatus = fs.getFileStatus(path); + listStatusRecursively(fs, fileStatus, 0, expectLevel, result); + } catch (IOException ignore) { + return new FileStatus[0]; + } + + return result.toArray(new FileStatus[0]); + } + + private static void listStatusRecursively( + FileSystem fs, + FileStatus fileStatus, + int level, + int expectLevel, + List<FileStatus> results) throws IOException { + if (expectLevel == level && !isHiddenFile(fileStatus)) { + results.add(fileStatus); + return; + } + + if (fileStatus.isDir() && !isHiddenFile(fileStatus)) { + for (FileStatus stat : fs.listStatus(fileStatus.getPath())) { + listStatusRecursively(fs, stat, level + 1, expectLevel, results); + } + } + } + + private static boolean isHiddenFile(FileStatus fileStatus) { + String name = fileStatus.getPath().getName(); + return name.startsWith("_") || name.startsWith("."); + } + + /** + * Same as getFileStatusRecursively but returns hadoop {@link org.apache.hadoop.fs.FileStatus}s. + */ + public static org.apache.hadoop.fs.FileStatus[] getFileStatusRecursivelyV2( Review comment: how about `getHadoopFileStatusRecursively` ########## File path: hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java ########## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.format; + +import org.apache.hudi.common.fs.FSUtils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableException; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference the Flink {@link org.apache.flink.table.utils.PartitionPathUtils} + * but supports simple partition path besides the Hive style. + */ +public class FilePathUtils { + + private static final Pattern HIVE_PARTITION_NAME_PATTERN = Pattern.compile("([^/]+)=([^/]+)"); + + private static final BitSet CHAR_TO_ESCAPE = new BitSet(128); + + static { + for (char c = 0; c < ' '; c++) { + CHAR_TO_ESCAPE.set(c); + } + + /* + * ASCII 01-1F are HTTP control characters that need to be escaped. + * \u000A and \u000D are \n and \r, respectively. + */ + char[] clist = new char[] {'\u0001', '\u0002', '\u0003', '\u0004', + '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', '\n', '\u000B', + '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', + '\u0013', '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', + '\u001A', '\u001B', '\u001C', '\u001D', '\u001E', '\u001F', + '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{', + '[', ']', '^'}; + + for (char c : clist) { + CHAR_TO_ESCAPE.set(c); + } + } + + private static boolean needsEscaping(char c) { + return c < CHAR_TO_ESCAPE.size() && CHAR_TO_ESCAPE.get(c); + } + + /** + * Make partition path from partition spec. + * + * @param partitionKVs The partition key value mapping + * @param hiveStylePartition Whether the partition path is with Hive style, + * e.g. {partition key} = {partition value} + * @return an escaped, valid partition name + */ + public static String generatePartitionPath( + LinkedHashMap<String, String> partitionKVs, + boolean hiveStylePartition) { + if (partitionKVs.isEmpty()) { + return ""; + } + StringBuilder suffixBuf = new StringBuilder(); + int i = 0; + for (Map.Entry<String, String> e : partitionKVs.entrySet()) { + if (i > 0) { + suffixBuf.append(Path.SEPARATOR); + } + if (hiveStylePartition) { + suffixBuf.append(escapePathName(e.getKey())); + suffixBuf.append('='); + } + suffixBuf.append(escapePathName(e.getValue())); + i++; + } + suffixBuf.append(Path.SEPARATOR); + return suffixBuf.toString(); + } + + /** + * Escapes a path name. + * + * @param path The path to escape. + * @return An escaped path name. + */ + private static String escapePathName(String path) { + if (path == null || path.length() == 0) { + throw new TableException("Path should not be null or empty: " + path); + } + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (needsEscaping(c)) { + sb.append('%'); + sb.append(String.format("%1$02X", (int) c)); + } else { + sb.append(c); + } + } + return sb.toString(); + } + + /** + * Generates partition values from path. + * + * @param currPath Partition file path + * @param hivePartition Whether the partition path is with Hive style + * @param partitionKeys Partition keys + * @return Sequential partition specs. + */ + public static List<String> extractPartitionValues( + Path currPath, + boolean hivePartition, + String[] partitionKeys) { + return new ArrayList<>(extractPartitionKeyValues(currPath, hivePartition, partitionKeys).values()); + } + + /** + * Generates partition key value mapping from path. + * + * @param currPath Partition file path + * @param hivePartition Whether the partition path is with Hive style + * @param partitionKeys Partition keys + * @return Sequential partition specs. + */ + public static LinkedHashMap<String, String> extractPartitionKeyValues( + Path currPath, + boolean hivePartition, + String[] partitionKeys) { + LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>(); + List<String[]> kvs = new ArrayList<>(); + int curDepth = 0; + do { + String component = currPath.getName(); + final String[] kv = new String[2]; + if (hivePartition) { + Matcher m = HIVE_PARTITION_NAME_PATTERN.matcher(component); + if (m.matches()) { + String k = unescapePathName(m.group(1)); + String v = unescapePathName(m.group(2)); + kv[0] = k; + kv[1] = v; + } + } else { + kv[0] = partitionKeys[partitionKeys.length - 1 - curDepth]; + kv[1] = unescapePathName(component); + } + kvs.add(kv); + currPath = currPath.getParent(); + curDepth++; + } while (currPath != null && !currPath.getName().isEmpty() && curDepth < partitionKeys.length); + + // reverse the list since we checked the part from leaf dir to table's base dir + for (int i = kvs.size(); i > 0; i--) { + fullPartSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]); + } + + return fullPartSpec; + } + + public static String unescapePathName(String path) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < path.length(); i++) { + char c = path.charAt(i); + if (c == '%' && i + 2 < path.length()) { + int code = -1; + try { + code = Integer.parseInt(path.substring(i + 1, i + 3), 16); + } catch (Exception ignored) { + // do nothing + } + if (code >= 0) { + sb.append((char) code); + i += 2; + continue; + } + } + sb.append(c); + } + return sb.toString(); + } + + /** + * Search all partitions in this path. + * + * @param fs File system + * @param path Search path + * @param hivePartition Whether the partition path is with Hive style + * @param partitionKeys Partition keys + * @return all partition key value mapping in sequence of the given path + */ + public static List<Tuple2<LinkedHashMap<String, String>, Path>> searchPartKeyValueAndPaths( + FileSystem fs, + Path path, + boolean hivePartition, + String[] partitionKeys) { + // expectLevel start from 0, E.G. base_path/level0/level1/level2 + FileStatus[] generatedParts = getFileStatusRecursively(path, partitionKeys.length, fs); + List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList<>(); + for (FileStatus part : generatedParts) { + ret.add( + new Tuple2<>( + extractPartitionKeyValues(part.getPath(), hivePartition, partitionKeys), + part.getPath())); + } + return ret; + } + + private static FileStatus[] getFileStatusRecursively(Path path, int expectLevel, FileSystem fs) { + ArrayList<FileStatus> result = new ArrayList<>(); + + try { + FileStatus fileStatus = fs.getFileStatus(path); + listStatusRecursively(fs, fileStatus, 0, expectLevel, result); + } catch (IOException ignore) { + return new FileStatus[0]; + } + + return result.toArray(new FileStatus[0]); + } + + private static void listStatusRecursively( + FileSystem fs, + FileStatus fileStatus, + int level, + int expectLevel, + List<FileStatus> results) throws IOException { + if (expectLevel == level && !isHiddenFile(fileStatus)) { + results.add(fileStatus); + return; + } + + if (fileStatus.isDir() && !isHiddenFile(fileStatus)) { + for (FileStatus stat : fs.listStatus(fileStatus.getPath())) { + listStatusRecursively(fs, stat, level + 1, expectLevel, results); + } + } + } + + private static boolean isHiddenFile(FileStatus fileStatus) { + String name = fileStatus.getPath().getName(); + return name.startsWith("_") || name.startsWith("."); + } + + /** + * Same as getFileStatusRecursively but returns hadoop {@link org.apache.hadoop.fs.FileStatus}s. Review comment: return hadoop FileStatus? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
