lirui-apache commented on a change in pull request #12004:
URL: https://github.com/apache/flink/pull/12004#discussion_r421206166
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
##########
@@ -248,7 +263,11 @@ public RowData nextRecord(RowData reuse) throws
IOException {
}
@Override
- public void close() throws IOException {
+ public void close() {
Review comment:
What's the difference between `close` and `closeInputFormat`? And can we
have some comments explaining why we don't have to do anything in `close`?
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.table.filesystem.TimestampPartTimeExtractor.toLocalDateTime;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ * <li>Monitoring partitions of hive meta store.</li>
+ * <li>Deciding which partitions should be further read and processed.</li>
+ * <li>Creating the {@link HiveTableInputSplit splits} corresponding to
those partitions.</li>
+ * <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+ extends RichSourceFunction<TimestampedHiveInputSplit>
+ implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+ /** The parallelism of the downstream readers. */
+ private final int readerParallelism;
+
+ /** The interval between consecutive path scans. */
+ private final long interval;
+
+ private final HiveShim hiveShim;
+
+ private final JobConfWrapper conf;
+
+ private final ObjectPath tablePath;
+
+ private final List<String> partitionKeys;
+
+ private final String[] fieldNames;
+
+ private final DataType[] fieldTypes;
+
+ // consumer variables
+ private final String consumeOrder;
+ private final String consumeOffset;
+
+ // extractor variables
+ private final String extractorType;
+ private final String extractorClass;
+ private final String extractorPattern;
+
+ private volatile boolean isRunning = true;
+
+ /** The maximum partition read time seen so far. */
+ private volatile long currentReadTime;
+
+ private transient PartitionDiscovery.Context context;
+
+ private transient PartitionDiscovery fetcher;
+
+ private transient Object checkpointLock;
+
+ private transient ListState<Long> currReadTimeState;
+
+ private transient ListState<List<List<String>>> distinctPartsState;
+
+ private transient IMetaStoreClient client;
+
+ private transient Properties tableProps;
+
+ private transient String defaultPartitionName;
+
+ private transient Set<List<String>> distinctPartitions;
+
+ public HiveContinuousMonitoringFunction(
+ HiveShim hiveShim,
+ JobConf conf,
+ ObjectPath tablePath,
+ CatalogTable catalogTable,
+ int readerParallelism,
+ String consumeOrder,
+ String consumeOffset,
+ String extractorType,
+ String extractorClass,
+ String extractorPattern,
+ long interval) {
+ this.hiveShim = hiveShim;
+ this.conf = new JobConfWrapper(conf);
+ this.tablePath = tablePath;
+ this.partitionKeys = catalogTable.getPartitionKeys();
+ this.fieldNames = catalogTable.getSchema().getFieldNames();
+ this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+ this.consumeOrder = consumeOrder;
+ this.extractorType = extractorType;
+ this.extractorClass = extractorClass;
+ this.extractorPattern = extractorPattern;
+ this.consumeOffset = consumeOffset;
+
+ this.interval = interval;
+ this.readerParallelism = Math.max(readerParallelism, 1);
+ this.currentReadTime = 0;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ this.currReadTimeState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ LongSerializer.INSTANCE
+ )
+ );
+ this.distinctPartsState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ new ListSerializer<>(new
ListSerializer<>(StringSerializer.INSTANCE))
+ )
+ );
+
+ this.client = this.hiveShim.getHiveMetastoreClient(new
HiveConf(conf.conf(), HiveConf.class));
+
+ Table hiveTable = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ this.tableProps =
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+ this.defaultPartitionName =
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+ PartitionTimeExtractor extractor =
PartitionTimeExtractor.create(
+ getRuntimeContext().getUserCodeClassLoader(),
+ extractorType,
+ extractorClass,
+ extractorPattern);
+
+ this.fetcher = new DirectoryMonitorDiscovery();
+
+ Path location = new Path(hiveTable.getSd().getLocation());
+ FileSystem fs = location.getFileSystem(conf.conf());
+ this.context = new PartitionDiscovery.Context() {
+
+ @Override
+ public List<String> partitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public Partition getPartition(List<String> partValues)
throws TException {
+ return client.getPartition(
Review comment:
I think it's possible that a partition folder exists in FS but not in
HMS, in which case that folder should be simply ignored instead of throwing an
exception.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.table.filesystem.TimestampPartTimeExtractor.toLocalDateTime;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ * <li>Monitoring partitions of hive meta store.</li>
+ * <li>Deciding which partitions should be further read and processed.</li>
+ * <li>Creating the {@link HiveTableInputSplit splits} corresponding to
those partitions.</li>
+ * <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+ extends RichSourceFunction<TimestampedHiveInputSplit>
+ implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+ /** The parallelism of the downstream readers. */
+ private final int readerParallelism;
+
+ /** The interval between consecutive path scans. */
+ private final long interval;
+
+ private final HiveShim hiveShim;
+
+ private final JobConfWrapper conf;
+
+ private final ObjectPath tablePath;
+
+ private final List<String> partitionKeys;
+
+ private final String[] fieldNames;
+
+ private final DataType[] fieldTypes;
+
+ // consumer variables
+ private final String consumeOrder;
+ private final String consumeOffset;
+
+ // extractor variables
+ private final String extractorType;
+ private final String extractorClass;
+ private final String extractorPattern;
+
+ private volatile boolean isRunning = true;
+
+ /** The maximum partition read time seen so far. */
+ private volatile long currentReadTime;
+
+ private transient PartitionDiscovery.Context context;
+
+ private transient PartitionDiscovery fetcher;
+
+ private transient Object checkpointLock;
+
+ private transient ListState<Long> currReadTimeState;
+
+ private transient ListState<List<List<String>>> distinctPartsState;
+
+ private transient IMetaStoreClient client;
+
+ private transient Properties tableProps;
+
+ private transient String defaultPartitionName;
+
+ private transient Set<List<String>> distinctPartitions;
+
+ public HiveContinuousMonitoringFunction(
+ HiveShim hiveShim,
+ JobConf conf,
+ ObjectPath tablePath,
+ CatalogTable catalogTable,
+ int readerParallelism,
+ String consumeOrder,
+ String consumeOffset,
+ String extractorType,
+ String extractorClass,
+ String extractorPattern,
+ long interval) {
+ this.hiveShim = hiveShim;
+ this.conf = new JobConfWrapper(conf);
+ this.tablePath = tablePath;
+ this.partitionKeys = catalogTable.getPartitionKeys();
+ this.fieldNames = catalogTable.getSchema().getFieldNames();
+ this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+ this.consumeOrder = consumeOrder;
+ this.extractorType = extractorType;
+ this.extractorClass = extractorClass;
+ this.extractorPattern = extractorPattern;
+ this.consumeOffset = consumeOffset;
+
+ this.interval = interval;
+ this.readerParallelism = Math.max(readerParallelism, 1);
+ this.currentReadTime = 0;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ this.currReadTimeState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ LongSerializer.INSTANCE
+ )
+ );
+ this.distinctPartsState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ new ListSerializer<>(new
ListSerializer<>(StringSerializer.INSTANCE))
+ )
+ );
+
+ this.client = this.hiveShim.getHiveMetastoreClient(new
HiveConf(conf.conf(), HiveConf.class));
+
+ Table hiveTable = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ this.tableProps =
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+ this.defaultPartitionName =
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+ PartitionTimeExtractor extractor =
PartitionTimeExtractor.create(
+ getRuntimeContext().getUserCodeClassLoader(),
+ extractorType,
+ extractorClass,
+ extractorPattern);
+
+ this.fetcher = new DirectoryMonitorDiscovery();
+
+ Path location = new Path(hiveTable.getSd().getLocation());
+ FileSystem fs = location.getFileSystem(conf.conf());
+ this.context = new PartitionDiscovery.Context() {
+
+ @Override
+ public List<String> partitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public Partition getPartition(List<String> partValues)
throws TException {
+ return client.getPartition(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ partValues);
+ }
+
+ @Override
+ public FileSystem fileSystem() {
+ return fs;
+ }
+
+ @Override
+ public Path tableLocation() {
+ return new
Path(hiveTable.getSd().getLocation());
+ }
+
+ @Override
+ public long extractTimestamp(
+ List<String> partKeys,
+ List<String> partValues,
+ Supplier<Long> fileTime) {
+ switch (consumeOrder) {
+ case "create-time":
Review comment:
Should add constants for `create-time` and `partition-time`.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static
org.apache.flink.table.filesystem.TimestampPartTimeExtractor.toLocalDateTime;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ * <li>Monitoring partitions of hive meta store.</li>
+ * <li>Deciding which partitions should be further read and processed.</li>
+ * <li>Creating the {@link HiveTableInputSplit splits} corresponding to
those partitions.</li>
+ * <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+ extends RichSourceFunction<TimestampedHiveInputSplit>
+ implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+ /** The parallelism of the downstream readers. */
+ private final int readerParallelism;
+
+ /** The interval between consecutive path scans. */
+ private final long interval;
+
+ private final HiveShim hiveShim;
+
+ private final JobConfWrapper conf;
+
+ private final ObjectPath tablePath;
+
+ private final List<String> partitionKeys;
+
+ private final String[] fieldNames;
+
+ private final DataType[] fieldTypes;
+
+ // consumer variables
+ private final String consumeOrder;
+ private final String consumeOffset;
+
+ // extractor variables
+ private final String extractorType;
+ private final String extractorClass;
+ private final String extractorPattern;
+
+ private volatile boolean isRunning = true;
+
+ /** The maximum partition read time seen so far. */
+ private volatile long currentReadTime;
+
+ private transient PartitionDiscovery.Context context;
+
+ private transient PartitionDiscovery fetcher;
+
+ private transient Object checkpointLock;
+
+ private transient ListState<Long> currReadTimeState;
+
+ private transient ListState<List<List<String>>> distinctPartsState;
+
+ private transient IMetaStoreClient client;
+
+ private transient Properties tableProps;
+
+ private transient String defaultPartitionName;
+
+ private transient Set<List<String>> distinctPartitions;
+
+ public HiveContinuousMonitoringFunction(
+ HiveShim hiveShim,
+ JobConf conf,
+ ObjectPath tablePath,
+ CatalogTable catalogTable,
+ int readerParallelism,
+ String consumeOrder,
+ String consumeOffset,
+ String extractorType,
+ String extractorClass,
+ String extractorPattern,
+ long interval) {
+ this.hiveShim = hiveShim;
+ this.conf = new JobConfWrapper(conf);
+ this.tablePath = tablePath;
+ this.partitionKeys = catalogTable.getPartitionKeys();
+ this.fieldNames = catalogTable.getSchema().getFieldNames();
+ this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+ this.consumeOrder = consumeOrder;
+ this.extractorType = extractorType;
+ this.extractorClass = extractorClass;
+ this.extractorPattern = extractorPattern;
+ this.consumeOffset = consumeOffset;
+
+ this.interval = interval;
+ this.readerParallelism = Math.max(readerParallelism, 1);
+ this.currentReadTime = 0;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ this.currReadTimeState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ LongSerializer.INSTANCE
+ )
+ );
+ this.distinctPartsState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ new ListSerializer<>(new
ListSerializer<>(StringSerializer.INSTANCE))
+ )
+ );
+
+ this.client = this.hiveShim.getHiveMetastoreClient(new
HiveConf(conf.conf(), HiveConf.class));
+
+ Table hiveTable = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ this.tableProps =
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+ this.defaultPartitionName =
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+ PartitionTimeExtractor extractor =
PartitionTimeExtractor.create(
+ getRuntimeContext().getUserCodeClassLoader(),
+ extractorType,
+ extractorClass,
+ extractorPattern);
+
+ this.fetcher = new DirectoryMonitorDiscovery();
+
+ Path location = new Path(hiveTable.getSd().getLocation());
+ FileSystem fs = location.getFileSystem(conf.conf());
+ this.context = new PartitionDiscovery.Context() {
+
+ @Override
+ public List<String> partitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public Partition getPartition(List<String> partValues)
throws TException {
+ return client.getPartition(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ partValues);
+ }
+
+ @Override
+ public FileSystem fileSystem() {
+ return fs;
+ }
+
+ @Override
+ public Path tableLocation() {
+ return new
Path(hiveTable.getSd().getLocation());
+ }
+
+ @Override
+ public long extractTimestamp(
+ List<String> partKeys,
+ List<String> partValues,
+ Supplier<Long> fileTime) {
+ switch (consumeOrder) {
+ case "create-time":
+ return fileTime.get();
+ case "partition-time":
+ return
toMills(extractor.extract(partKeys, partValues));
+ default:
+ throw new
UnsupportedOperationException(
+ "Unsupported
consumer order: " + consumeOrder);
+ }
+ }
+ };
+
+ this.distinctPartitions = new HashSet<>();
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.",
getClass().getSimpleName());
+ this.currentReadTime =
this.currReadTimeState.get().iterator().next();
+
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
+ } else {
+ LOG.info("No state to restore for the {}.",
getClass().getSimpleName());
+ if (consumeOffset != null) {
+ this.currentReadTime =
toMills(toLocalDateTime(consumeOffset));
+ }
+ }
+ }
+
+ @Override
+ public void run(SourceContext<TimestampedHiveInputSplit> context)
throws Exception {
+ checkpointLock = context.getCheckpointLock();
+ while (isRunning) {
+ synchronized (checkpointLock) {
+ monitorAndForwardSplits(context);
+ }
+ Thread.sleep(interval);
+ }
+ }
+
+ private void monitorAndForwardSplits(
+ SourceContext<TimestampedHiveInputSplit> context)
throws Exception {
+ assert (Thread.holdsLock(checkpointLock));
+
+ List<Tuple2<Partition, Long>> partitions =
fetcher.fetchPartitions(this.context, currentReadTime);
+
+ if (partitions.isEmpty()) {
+ return;
+ }
+
+ partitions.sort((o1, o2) -> (int) (o1.f1 - o2.f1));
+
+ long maxTimestamp = Long.MIN_VALUE;
+ Set<List<String>> nextDistinctParts = new HashSet<>();
+ for (Tuple2<Partition, Long> tuple2 : partitions) {
+ Partition partition = tuple2.f0;
+ List<String> partSpec = partition.getValues();
+ if (!this.distinctPartitions.contains(partSpec)) {
+
this.distinctPartitions.add(partition.getValues());
Review comment:
this.distinctPartitions.add(partSpec)?
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import
org.apache.flink.connectors.hive.read.PartitionStrategy.PartitionStrategyFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ * <li>Monitoring partitions of hive meta store.</li>
+ * <li>Deciding which partitions should be further read and processed.</li>
+ * <li>Creating the {@link HiveTableInputSplit splits} corresponding to
those partitions.</li>
+ * <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+ extends RichSourceFunction<TimestampedHiveInputSplit>
+ implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+ /** The parallelism of the downstream readers. */
+ private final int readerParallelism;
+
+ /** The interval between consecutive path scans. */
+ private final long interval;
+
+ private final HiveShim hiveShim;
+
+ private final JobConfWrapper conf;
+
+ private final ObjectPath tablePath;
+
+ private final List<String> partitionKeys;
+
+ private final String[] fieldNames;
+
+ private final DataType[] fieldTypes;
+
+ private final long startupTimestampMillis;
+
+ private final PartitionStrategyFactory partStrategyFactory;
+
+ private volatile boolean isRunning = true;
+
+ /** The maximum partition read time seen so far. */
+ private volatile long currentReadTime;
+
+ private transient PartitionStrategy strategy;
+
+ private transient Object checkpointLock;
+
+ private transient ListState<Long> currReadTimeState;
+
+ private transient ListState<List<List<String>>> distinctPartsState;
+
+ private transient IMetaStoreClient client;
+
+ private transient Properties tableProps;
+
+ private transient String defaultPartitionName;
+
+ private transient Set<List<String>> distinctPartitions;
+
+ public HiveContinuousMonitoringFunction(
+ HiveShim hiveShim,
+ JobConf conf,
+ ObjectPath tablePath,
+ CatalogTable catalogTable,
+ long startupTimestampMillis,
+ PartitionStrategyFactory partStrategyFactory,
+ int readerParallelism,
+ long interval) {
+ this.hiveShim = hiveShim;
+ this.conf = new JobConfWrapper(conf);
+ this.tablePath = tablePath;
+ this.partitionKeys = catalogTable.getPartitionKeys();
+ this.fieldNames = catalogTable.getSchema().getFieldNames();
+ this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+ this.startupTimestampMillis = startupTimestampMillis;
+ this.partStrategyFactory = partStrategyFactory;
+
+ this.interval = interval;
+ this.readerParallelism = Math.max(readerParallelism, 1);
+ this.currentReadTime = Long.MIN_VALUE;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context)
throws Exception {
+ this.strategy =
partStrategyFactory.createStrategy(getRuntimeContext().getUserCodeClassLoader());
+
+ this.currReadTimeState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ LongSerializer.INSTANCE
+ )
+ );
+ this.distinctPartsState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "partition-monitoring-state",
+ new ListSerializer<>(new
ListSerializer<>(StringSerializer.INSTANCE))
+ )
+ );
+
+ this.client = this.hiveShim.getHiveMetastoreClient(new
HiveConf(conf.conf(), HiveConf.class));
+
+ Table hiveTable = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ this.tableProps =
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+ this.defaultPartitionName =
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+ this.distinctPartitions = new HashSet<>();
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.",
getClass().getSimpleName());
+ this.currentReadTime =
this.currReadTimeState.get().iterator().next();
+
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
+ } else {
+ LOG.info("No state to restore for the {}.",
getClass().getSimpleName());
+ this.currentReadTime = this.startupTimestampMillis;
+ }
+ }
+
+ @Override
+ public void run(SourceContext<TimestampedHiveInputSplit> context)
throws Exception {
+ checkpointLock = context.getCheckpointLock();
+ while (isRunning) {
+ synchronized (checkpointLock) {
+ monitorAndForwardSplits(context);
+ }
+ Thread.sleep(interval);
+ }
+ }
+
+ private void monitorAndForwardSplits(
+ SourceContext<TimestampedHiveInputSplit> context)
throws IOException, TException {
+ assert (Thread.holdsLock(checkpointLock));
+
+ List<Partition> partitions = client.listPartitionsByFilter(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ strategy.generateFetchFilter(partitionKeys,
currentReadTime),
+ (short) -1);
+
+ if (partitions.isEmpty()) {
+ return;
+ }
+
+ long maxTime = Long.MIN_VALUE;
+ for (Partition partition : partitions) {
+ List<String> partSpec = partition.getValues();
+ if (!this.distinctPartitions.contains(partSpec)) {
+
this.distinctPartitions.add(partition.getValues());
+ long time =
this.strategy.extractPartTime(partitionKeys, partition.getValues());
+ if (time > maxTime) {
+ maxTime = time;
+ }
+ HiveTableInputSplit[] splits =
HiveTableInputFormat.createInputSplits(
+ this.readerParallelism,
+
Collections.singletonList(toHiveTablePartition(partition)),
+ this.conf.conf());
+ for (HiveTableInputSplit split : splits) {
+ context.collect(new
TimestampedHiveInputSplit(time, split));
+ }
+ }
+ }
+ this.currentReadTime = maxTime;
+
+ this.distinctPartitions.removeIf(partSpec ->
this.strategy.canExpireForDistinct(
+ this.strategy.extractPartTime(partitionKeys,
partSpec),
+ this.currentReadTime));
+ }
+
+ private HiveTablePartition toHiveTablePartition(Partition p) {
+ return HiveTableSource.toHiveTablePartition(
+ partitionKeys, fieldNames, fieldTypes,
hiveShim, tableProps, defaultPartitionName, p);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ Preconditions.checkState(this.currReadTimeState != null,
+ "The " + getClass().getSimpleName() + " state
has not been properly initialized.");
+
+ this.currReadTimeState.clear();
+ this.currReadTimeState.add(this.currentReadTime);
+
+ this.distinctPartsState.clear();
+ this.distinctPartsState.add(new
ArrayList<>(this.distinctPartitions));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} checkpointed {}.",
getClass().getSimpleName(), currentReadTime);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ if (checkpointLock != null) {
+ synchronized (checkpointLock) {
+ currentReadTime = Long.MAX_VALUE;
+ isRunning = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (checkpointLock != null) {
+ // this is to cover the case where cancel() is called
before the run()
Review comment:
Let's move this comment to the outside of the if block, so that it's
easier to understand.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/SplitReader.java
##########
@@ -46,4 +46,17 @@
* @throws IOException Thrown, if an I/O error occurred.
*/
RowData nextRecord(RowData reuse) throws IOException;
+
+ /**
+ * Seek to a particular row number.
+ */
+ default void seekToRow(long rowCount, RowData reuse) throws IOException
{
+ for (int i = 0; i < rowCount; i++) {
+ boolean end = reachedEnd();
+ if (end) {
+ throw new RuntimeException("Seek to many
rows.");
Review comment:
"Seek too many rows." ?
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link HiveTableInputSplit} with {@link TimestampedInputSplit}.
+ * Kryo serializer can not deal with hadoop split, need specific type
information factory.
+ */
+@TypeInfo(TimestampedHiveInputSplit.SplitTypeInfoFactory.class)
+public class TimestampedHiveInputSplit extends HiveTableInputSplit implements
TimestampedInputSplit {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The modification time of the file this split belongs to. */
+ private final long modificationTime;
+
+ /**
+ * The state of the split. This information is used when
+ * restoring from a checkpoint and allows to resume reading the
+ * underlying file from the point we left off.
+ * */
+ private Serializable splitState;
+
+ public TimestampedHiveInputSplit(
+ long modificationTime,
+ HiveTableInputSplit split) {
+ super(
+ split.getSplitNumber(),
+ split.getHadoopInputSplit(),
+ split.getJobConf(),
+ split.getHiveTablePartition());
+ this.modificationTime = modificationTime;
+ }
+
+ @Override
+ public void setSplitState(Serializable state) {
+ this.splitState = state;
+ }
+
+ @Override
+ public Serializable getSplitState() {
+ return this.splitState;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return modificationTime;
+ }
+
+ @Override
+ public int compareTo(TimestampedInputSplit o) {
+ TimestampedHiveInputSplit split = (TimestampedHiveInputSplit) o;
+ int modTimeComp = Long.compare(this.modificationTime,
split.modificationTime);
+ if (modTimeComp != 0L) {
+ return modTimeComp;
+ }
+
+ int pathComp =
this.hiveTablePartition.getStorageDescriptor().compareTo(
+
split.hiveTablePartition.getStorageDescriptor());
+
+ return pathComp != 0 ? pathComp :
+ this.getSplitNumber() - o.getSplitNumber();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
Review comment:
For two split instances, this means `equals` can return false while
`compareTo` returns 0. Is this intended behavior?
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link HiveTableInputSplit} with {@link TimestampedInputSplit}.
+ * Kryo serializer can not deal with hadoop split, need specific type
information factory.
+ */
+@TypeInfo(TimestampedHiveInputSplit.SplitTypeInfoFactory.class)
+public class TimestampedHiveInputSplit extends HiveTableInputSplit implements
TimestampedInputSplit {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The modification time of the file this split belongs to. */
+ private final long modificationTime;
+
+ /**
+ * The state of the split. This information is used when
+ * restoring from a checkpoint and allows to resume reading the
+ * underlying file from the point we left off.
+ * */
+ private Serializable splitState;
+
+ public TimestampedHiveInputSplit(
+ long modificationTime,
+ HiveTableInputSplit split) {
+ super(
+ split.getSplitNumber(),
+ split.getHadoopInputSplit(),
+ split.getJobConf(),
+ split.getHiveTablePartition());
+ this.modificationTime = modificationTime;
+ }
+
+ @Override
+ public void setSplitState(Serializable state) {
+ this.splitState = state;
+ }
+
+ @Override
+ public Serializable getSplitState() {
+ return this.splitState;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return modificationTime;
+ }
+
+ @Override
+ public int compareTo(TimestampedInputSplit o) {
+ TimestampedHiveInputSplit split = (TimestampedHiveInputSplit) o;
+ int modTimeComp = Long.compare(this.modificationTime,
split.modificationTime);
+ if (modTimeComp != 0L) {
+ return modTimeComp;
+ }
+
+ int pathComp =
this.hiveTablePartition.getStorageDescriptor().compareTo(
Review comment:
I think this compares more than just paths.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+ key("streaming-source.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable streaming
source or not.");
+
+ public static final ConfigOption<Duration>
STREAMING_SOURCE_MONITOR_INTERVAL =
+ key("streaming-source.monitor-interval")
+ .durationType()
+ .defaultValue(Duration.ofMillis(1))
+ .withDescription("The minimum interval
allowed between consecutive partition/file discovery.");
Review comment:
Why it's the minimum interval? It may seem confusing to users.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
Review comment:
Wondering what's the relationship between this and the execution mode?
E.g. can I enable streaming source with batch execution mode?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+ key("streaming-source.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable streaming
source or not.");
+
+ public static final ConfigOption<Duration>
STREAMING_SOURCE_MONITOR_INTERVAL =
+ key("streaming-source.monitor-interval")
+ .durationType()
+ .defaultValue(Duration.ofMillis(1))
+ .withDescription("The minimum interval
allowed between consecutive partition/file discovery.");
+
+ public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER
=
+ key("streaming-source.consume-order")
+ .stringType()
+ .defaultValue("create-time")
+ .withDescription("The consume order of
streaming source," +
+ " support create-time
and partition-time." +
+ " create-time compare
partition/file creation time;" +
+ " partition-time
compare time represented by partition name.");
+
+ public static final ConfigOption<String>
STREAMING_SOURCE_CONSUME_START_OFFSET =
+ key("streaming-source.consume-start-offset")
+ .stringType()
+ .defaultValue("1970-00-00")
+ .withDescription("Start offset for
streaming consuming." +
+ " How to parse and
compare offsets depends on your order." +
+ " For create-time and
partition-time, should be a timestamp string.");
+
+ public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+ key("partition.time-extractor.type")
+ .stringType()
+ .defaultValue("timestamp")
+ .withDescription("Time extractor to
extract time from partition values." +
Review comment:
Clarify this should only be used if `STREAMING_SOURCE_CONSUME_ORDER` is
set to `partition-time`?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+ key("streaming-source.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable streaming
source or not.");
+
+ public static final ConfigOption<Duration>
STREAMING_SOURCE_MONITOR_INTERVAL =
+ key("streaming-source.monitor-interval")
+ .durationType()
+ .defaultValue(Duration.ofMillis(1))
+ .withDescription("The minimum interval
allowed between consecutive partition/file discovery.");
+
+ public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER
=
+ key("streaming-source.consume-order")
+ .stringType()
+ .defaultValue("create-time")
+ .withDescription("The consume order of
streaming source," +
+ " support create-time
and partition-time." +
+ " create-time compare
partition/file creation time;" +
+ " partition-time
compare time represented by partition name.");
+
+ public static final ConfigOption<String>
STREAMING_SOURCE_CONSUME_START_OFFSET =
+ key("streaming-source.consume-start-offset")
+ .stringType()
+ .defaultValue("1970-00-00")
+ .withDescription("Start offset for
streaming consuming." +
+ " How to parse and
compare offsets depends on your order." +
+ " For create-time and
partition-time, should be a timestamp string.");
+
+ public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+ key("partition.time-extractor.type")
+ .stringType()
+ .defaultValue("timestamp")
+ .withDescription("Time extractor to
extract time from partition values." +
+ " Support timestamp and
custom." +
+ " For timestamp, can
configure timestamp pattern." +
+ " For custom, should
configure extractor class.");
+
+ public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS
=
+ key("partition.time-extractor.class")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The extractor class
for implement PartitionTimeExtractor interface.");
+
+ public static final ConfigOption<String>
PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN =
+ key("partition.time-extractor.timestamp-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The 'timestamp'
construction way allows users to use partition" +
+ " fields to get a legal
timestamp pattern." +
+ " Default support
'yyyy-mm-dd hh:mm:ss' from first field." +
+ " If timestamp in
partition is single field 'dt', can configure: '$dt'." +
+ " If timestamp in
partition is year, month, day, hour," +
+ " can configure:
'$year-$month-$day $hour:00:00'." +
+ " If timestamp in
partition is dt and hour, can configure: '$dt $hour:00:00'.");
+
+ public static final ConfigOption<Duration> PARTITION_TIME_INTERVAL =
Review comment:
Seems unused
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+ key("streaming-source.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable streaming
source or not.");
+
+ public static final ConfigOption<Duration>
STREAMING_SOURCE_MONITOR_INTERVAL =
+ key("streaming-source.monitor-interval")
+ .durationType()
+ .defaultValue(Duration.ofMillis(1))
+ .withDescription("The minimum interval
allowed between consecutive partition/file discovery.");
+
+ public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER
=
+ key("streaming-source.consume-order")
+ .stringType()
+ .defaultValue("create-time")
+ .withDescription("The consume order of
streaming source," +
+ " support create-time
and partition-time." +
+ " create-time compare
partition/file creation time;" +
+ " partition-time
compare time represented by partition name.");
+
+ public static final ConfigOption<String>
STREAMING_SOURCE_CONSUME_START_OFFSET =
+ key("streaming-source.consume-start-offset")
+ .stringType()
+ .defaultValue("1970-00-00")
+ .withDescription("Start offset for
streaming consuming." +
+ " How to parse and
compare offsets depends on your order." +
+ " For create-time and
partition-time, should be a timestamp string.");
+
+ public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+ key("partition.time-extractor.type")
+ .stringType()
+ .defaultValue("timestamp")
+ .withDescription("Time extractor to
extract time from partition values." +
+ " Support timestamp and
custom." +
Review comment:
I find `timestamp` somehow difficult to understand. According to the
`PartitionTimeExtractor` interface, any implementation should extract a
"timestamp". We may as well just call it `default`?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTimeExtractor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * Time extractor to extract time from partition values.
+ */
+@Experimental
+public interface PartitionTimeExtractor extends Serializable {
+
+ /**
+ * Extract time from partition keys and values.
+ */
+ LocalDateTime extract(List<String> partitionKeys, List<String>
partitionValues);
+
+ static PartitionTimeExtractor create(
+ ClassLoader userClassLoader,
+ String extractorType,
+ String extractorClass,
+ String extractorPattern) {
+ switch (extractorType) {
+ case "timestamp":
Review comment:
Should add constants for these values
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive)
connector.
+ */
+public class FileSystemOptions {
+
+ public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+ key("streaming-source.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable streaming
source or not.");
+
+ public static final ConfigOption<Duration>
STREAMING_SOURCE_MONITOR_INTERVAL =
+ key("streaming-source.monitor-interval")
+ .durationType()
+ .defaultValue(Duration.ofMillis(1))
+ .withDescription("The minimum interval
allowed between consecutive partition/file discovery.");
+
+ public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER
=
+ key("streaming-source.consume-order")
+ .stringType()
+ .defaultValue("create-time")
+ .withDescription("The consume order of
streaming source," +
+ " support create-time
and partition-time." +
+ " create-time compare
partition/file creation time;" +
Review comment:
I think we'd better clarify this is not the partition create time in
HMS, but the folder/file create time in filesystem?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]