This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fc581da [FLINK-19888][hive] Migrate Hive source to FLIP-27 source
interface for streaming
fc581da is described below
commit fc581da73fcb9bbaa443604d36498c9a5dbdf0cd
Author: Rui Li <[email protected]>
AuthorDate: Sun Nov 8 21:13:37 2020 +0800
[FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for
streaming
This closes #13963
---
.../connector/file/src/AbstractFileSource.java | 14 ++
.../file/src/ContinuousEnumerationSettings.java | 4 +-
.../file/src/PendingSplitsCheckpoint.java | 4 +-
.../ContinuousHivePendingSplitsCheckpoint.java | 53 +++++
...nuousHivePendingSplitsCheckpointSerializer.java | 139 ++++++++++++
.../hive/ContinuousHiveSplitEnumerator.java | 230 ++++++++++++++++++++
.../apache/flink/connectors/hive/HiveSource.java | 205 +++++++++++++++---
.../flink/connectors/hive/HiveTableSource.java | 195 +++++------------
.../read/HiveContinuousMonitoringFunction.java | 241 ---------------------
.../hive/read/HiveContinuousPartitionFetcher.java | 2 +-
.../filesystem/ContinuousPartitionFetcher.java | 4 +-
.../flink/table/filesystem/PartitionFetcher.java | 2 +-
12 files changed, 669 insertions(+), 424 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index 147691b..46d3d37 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -100,6 +100,20 @@ public abstract class AbstractFileSource<T, SplitT extends
FileSourceSplit>
}
//
------------------------------------------------------------------------
+ // Getters
+ //
------------------------------------------------------------------------
+
+ public FileSplitAssigner.Provider getAssignerFactory() {
+ return assignerFactory;
+ }
+
+ @Nullable
+ public ContinuousEnumerationSettings getContinuousEnumerationSettings()
{
+ return continuousEnumerationSettings;
+ }
+
+
+ //
------------------------------------------------------------------------
// Source API Methods
//
------------------------------------------------------------------------
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
index 4123c43..72ab516 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
@@ -30,13 +30,13 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* file source's continuous discovery and streaming mode.
*/
@Internal
-final class ContinuousEnumerationSettings implements Serializable {
+public final class ContinuousEnumerationSettings implements Serializable {
private static final long serialVersionUID = 1L;
private final Duration discoveryInterval;
- ContinuousEnumerationSettings(Duration discoveryInterval) {
+ public ContinuousEnumerationSettings(Duration discoveryInterval) {
this.discoveryInterval = checkNotNull(discoveryInterval);
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
index 847f99e..5950df2 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
@@ -33,7 +33,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* A checkpoint of the current state of the containing the currently pending
splits that are not yet assigned.
*/
@PublicEvolving
-public final class PendingSplitsCheckpoint<SplitT extends FileSourceSplit> {
+public class PendingSplitsCheckpoint<SplitT extends FileSourceSplit> {
/** The splits in the checkpoint. */
private final Collection<SplitT> splits;
@@ -48,7 +48,7 @@ public final class PendingSplitsCheckpoint<SplitT extends
FileSourceSplit> {
@Nullable
byte[] serializedFormCache;
- private PendingSplitsCheckpoint(Collection<SplitT> splits,
Collection<Path> alreadyProcessedPaths) {
+ protected PendingSplitsCheckpoint(Collection<SplitT> splits,
Collection<Path> alreadyProcessedPaths) {
this.splits = Collections.unmodifiableCollection(splits);
this.alreadyProcessedPaths =
Collections.unmodifiableCollection(alreadyProcessedPaths);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
new file mode 100644
index 0000000..e562c75
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The checkpoint of current state of continuous hive source reading.
+ */
+public class ContinuousHivePendingSplitsCheckpoint extends
PendingSplitsCheckpoint<HiveSourceSplit> {
+
+ private final Comparable<?> currentReadOffset;
+ private final Collection<List<String>> seenPartitionsSinceOffset;
+
+ public ContinuousHivePendingSplitsCheckpoint(
+ Collection<HiveSourceSplit> splits,
+ Comparable<?> currentReadOffset,
+ Collection<List<String>> seenPartitionsSinceOffset) {
+ super(new ArrayList<>(splits), Collections.emptyList());
+ this.currentReadOffset = currentReadOffset;
+ this.seenPartitionsSinceOffset =
Collections.unmodifiableCollection(new ArrayList<>(seenPartitionsSinceOffset));
+ }
+
+ public Comparable<?> getCurrentReadOffset() {
+ return currentReadOffset;
+ }
+
+ public Collection<List<String>> getSeenPartitionsSinceOffset() {
+ return seenPartitionsSinceOffset;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..ab3f54f
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}.
+ */
+public class ContinuousHivePendingSplitsCheckpointSerializer implements
SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> {
+
+ private static final int VERSION = 1;
+
+ private final PendingSplitsCheckpointSerializer<HiveSourceSplit>
superSerDe;
+ // SerDe for a single partition
+ private final ListSerializer<String> partitionSerDe = new
ListSerializer<>(StringSerializer.INSTANCE);
+ // SerDe for the current read offset
+ private final ReadOffsetSerDe readOffsetSerDe =
ReadOffsetSerDeImpl.INSTANCE;
+
+ public
ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer<HiveSourceSplit>
splitSerDe) {
+ superSerDe = new
PendingSplitsCheckpointSerializer<>(splitSerDe);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PendingSplitsCheckpoint<HiveSourceSplit>
checkpoint) throws IOException {
+ Preconditions.checkArgument(checkpoint.getClass() ==
ContinuousHivePendingSplitsCheckpoint.class,
+ "Only supports %s",
ContinuousHivePendingSplitsCheckpoint.class.getName());
+
+ ContinuousHivePendingSplitsCheckpoint hiveCheckpoint =
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+ PendingSplitsCheckpoint<HiveSourceSplit> superCP =
PendingSplitsCheckpoint.fromCollectionSnapshot(
+ hiveCheckpoint.getSplits(),
hiveCheckpoint.getAlreadyProcessedPaths());
+ byte[] superBytes = superSerDe.serialize(superCP);
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+
+ try (DataOutputViewStreamWrapper outputWrapper = new
DataOutputViewStreamWrapper(byteArrayOutputStream)) {
+ outputWrapper.writeInt(superBytes.length);
+ outputWrapper.write(superBytes);
+
readOffsetSerDe.serialize(hiveCheckpoint.getCurrentReadOffset(), outputWrapper);
+
outputWrapper.writeInt(hiveCheckpoint.getSeenPartitionsSinceOffset().size());
+ for (List<String> partition :
hiveCheckpoint.getSeenPartitionsSinceOffset()) {
+ partitionSerDe.serialize(partition,
outputWrapper);
+ }
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @Override
+ public PendingSplitsCheckpoint<HiveSourceSplit> deserialize(int
version, byte[] serialized) throws IOException {
+ if (version == 1) {
+ try (DataInputViewStreamWrapper inputWrapper = new
DataInputViewStreamWrapper(new ByteArrayInputStream(serialized))) {
+ return deserializeV1(inputWrapper);
+ }
+ }
+ throw new IOException("Unknown version: " + version);
+ }
+
+ private PendingSplitsCheckpoint<HiveSourceSplit>
deserializeV1(DataInputViewStreamWrapper inputWrapper) throws IOException {
+ byte[] superBytes = new byte[inputWrapper.readInt()];
+ inputWrapper.readFully(superBytes);
+ PendingSplitsCheckpoint<HiveSourceSplit> superCP =
superSerDe.deserialize(superSerDe.getVersion(), superBytes);
+ try {
+ Comparable<?> currentReadOffset =
readOffsetSerDe.deserialize(inputWrapper);
+ int numParts = inputWrapper.readInt();
+ List<List<String>> parts = new ArrayList<>(numParts);
+ for (int i = 0; i < numParts; i++) {
+
parts.add(partitionSerDe.deserialize(inputWrapper));
+ }
+ return new
ContinuousHivePendingSplitsCheckpoint(superCP.getSplits(), currentReadOffset,
parts);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to deserialize " +
getClass().getName(), e);
+ }
+ }
+
+ private static class ReadOffsetSerDeImpl implements ReadOffsetSerDe {
+
+ private static final ReadOffsetSerDeImpl INSTANCE = new
ReadOffsetSerDeImpl();
+
+ private ReadOffsetSerDeImpl() {
+ }
+
+ @Override
+ public void serialize(Comparable<?> offset, OutputStream
outputStream) throws IOException {
+ InstantiationUtil.serializeObject(outputStream, offset);
+ }
+
+ @Override
+ public Comparable<?> deserialize(InputStream inputStream)
throws IOException, ClassNotFoundException {
+ return InstantiationUtil.deserializeObject(inputStream,
Thread.currentThread().getContextClassLoader());
+ }
+ }
+
+ /**
+ * SerDe for the current read offset.
+ */
+ interface ReadOffsetSerDe {
+
+ void serialize(Comparable<?> offset, OutputStream outputStream)
throws IOException;
+
+ Comparable<?> deserialize(InputStream inputStream) throws
IOException, ClassNotFoundException;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
new file mode 100644
index 0000000..628203f
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator<T extends Comparable<T>> implements
SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<HiveSourceSplit> enumeratorContext;
+ private final LinkedHashMap<Integer, String> readersAwaitingSplit;
+ private final FileSplitAssigner splitAssigner;
+ private final long discoveryInterval;
+
+ private final JobConf jobConf;
+ private final ObjectPath tablePath;
+
+ private final ContinuousPartitionFetcher<Partition, T> fetcher;
+ private final HiveTableSource.HiveContinuousPartitionFetcherContext<T>
fetcherContext;
+
+ private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+ // the maximum partition read offset seen so far
+ private volatile T currentReadOffset;
+ // the partitions that have been processed for the current read offset
+ private final Set<List<String>> seenPartitionsSinceOffset;
+
+ public ContinuousHiveSplitEnumerator(
+ SplitEnumeratorContext<HiveSourceSplit>
enumeratorContext,
+ T currentReadOffset,
+ Collection<List<String>> seenPartitionsSinceOffset,
+ FileSplitAssigner splitAssigner,
+ long discoveryInterval,
+ JobConf jobConf,
+ ObjectPath tablePath,
+ ContinuousPartitionFetcher<Partition, T> fetcher,
+
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext) {
+ this.enumeratorContext = enumeratorContext;
+ this.currentReadOffset = currentReadOffset;
+ this.seenPartitionsSinceOffset = new
HashSet<>(seenPartitionsSinceOffset);
+ this.splitAssigner = splitAssigner;
+ this.discoveryInterval = discoveryInterval;
+ this.jobConf = jobConf;
+ this.tablePath = tablePath;
+ this.fetcher = fetcher;
+ this.fetcherContext = fetcherContext;
+ readersAwaitingSplit = new LinkedHashMap<>();
+ }
+
+ @Override
+ public void start() {
+ try {
+ fetcherContext.open();
+ enumeratorContext.callAsync(
+ this::monitorAndGetSplits,
+ this::handleNewSplits,
+ discoveryInterval,
+ discoveryInterval);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to start
continuous split enumerator", e);
+ }
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof RequestSplitEvent) {
+ readersAwaitingSplit.put(subtaskId,
((RequestSplitEvent) sourceEvent).hostName());
+ assignSplits();
+ } else {
+ LOG.error("Received unrecognized event: {}",
sourceEvent);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<HiveSourceSplit> splits, int subtaskId) {
+ LOG.debug("Continuous Hive Source Enumerator adds splits back:
{}", splits);
+ stateLock.writeLock().lock();
+ try {
+ splitAssigner.addSplits(new ArrayList<>(splits));
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // this source is purely lazy-pull-based, nothing to do upon
registration
+ }
+
+ @Override
+ public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState() throws
Exception {
+ stateLock.readLock().lock();
+ try {
+ Collection<HiveSourceSplit> remainingSplits =
(Collection<HiveSourceSplit>) (Collection<?>) splitAssigner.remainingSplits();
+ return new
ContinuousHivePendingSplitsCheckpoint(remainingSplits, currentReadOffset,
seenPartitionsSinceOffset);
+ } finally {
+ stateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ fetcherContext.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Void monitorAndGetSplits() throws Exception {
+ stateLock.writeLock().lock();
+ try {
+ List<Tuple2<Partition, T>> partitions =
fetcher.fetchPartitions(fetcherContext, currentReadOffset);
+ if (partitions.isEmpty()) {
+ return null;
+ }
+
+ partitions.sort(Comparator.comparing(o -> o.f1));
+ List<HiveSourceSplit> newSplits = new ArrayList<>();
+ // the max offset of new partitions
+ T maxOffset = currentReadOffset;
+ Set<List<String>> nextSeen = new HashSet<>();
+ for (Tuple2<Partition, T> tuple2 : partitions) {
+ Partition partition = tuple2.f0;
+ List<String> partSpec = partition.getValues();
+ if (seenPartitionsSinceOffset.add(partSpec)) {
+ T offset = tuple2.f1;
+ if (offset.compareTo(currentReadOffset)
> 0) {
+ nextSeen.add(partSpec);
+ }
+ if (offset.compareTo(maxOffset) > 0) {
+ maxOffset = offset;
+ }
+ LOG.info("Found new partition {} of
table {}, generating splits for it",
+ partSpec,
tablePath.getFullName());
+
newSplits.addAll(HiveSourceFileEnumerator.createInputSplits(
+ 0,
Collections.singletonList(fetcherContext.toHiveTablePartition(partition)),
jobConf));
+ }
+ }
+ currentReadOffset = maxOffset;
+ splitAssigner.addSplits(new ArrayList<>(newSplits));
+ if (!nextSeen.isEmpty()) {
+ seenPartitionsSinceOffset.clear();
+ seenPartitionsSinceOffset.addAll(nextSeen);
+ }
+ return null;
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ private void handleNewSplits(Void v, Throwable error) {
+ if (error != null) {
+ LOG.error("Failed to enumerate files", error);
+ return;
+ }
+ assignSplits();
+ }
+
+ private void assignSplits() {
+ final Iterator<Map.Entry<Integer, String>> awaitingReader =
readersAwaitingSplit.entrySet().iterator();
+
+ stateLock.writeLock().lock();
+ try {
+ while (awaitingReader.hasNext()) {
+ final Map.Entry<Integer, String> nextAwaiting =
awaitingReader.next();
+ final String hostname = nextAwaiting.getValue();
+ final int awaitingSubtask =
nextAwaiting.getKey();
+ final Optional<FileSourceSplit> nextSplit =
splitAssigner.getNext(hostname);
+ if (nextSplit.isPresent()) {
+
enumeratorContext.assignSplit((HiveSourceSplit) nextSplit.get(),
awaitingSubtask);
+ awaitingReader.remove();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index a765646..243e4a4 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -18,22 +18,37 @@
package org.apache.flink.connectors.hive;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.file.src.AbstractFileSource;
+import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
import org.apache.flink.table.filesystem.LimitableBulkFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import static
org.apache.flink.connector.file.src.FileSource.DEFAULT_SPLIT_ASSIGNER;
@@ -46,22 +61,34 @@ public class HiveSource extends AbstractFileSource<RowData,
HiveSourceSplit> imp
private static final long serialVersionUID = 1L;
+ private final JobConfWrapper jobConfWrapper;
+ private final List<String> partitionKeys;
+ private final ContinuousPartitionFetcher<Partition, ?> fetcher;
+ private final HiveTableSource.HiveContinuousPartitionFetcherContext<?>
fetcherContext;
+ private final ObjectPath tablePath;
+
HiveSource(
+ Path[] inputPaths,
+ FileEnumerator.Provider fileEnumerator,
+ FileSplitAssigner.Provider splitAssigner,
+ BulkFormat<RowData, HiveSourceSplit> readerFormat,
+ @Nullable ContinuousEnumerationSettings
continuousEnumerationSettings,
JobConf jobConf,
- CatalogTable catalogTable,
- List<HiveTablePartition> partitions,
- @Nullable Long limit,
- String hiveVersion,
- boolean useMapRedReader,
- boolean isStreamingSource,
- RowType producedRowType) {
+ ObjectPath tablePath,
+ List<String> partitionKeys,
+ @Nullable ContinuousPartitionFetcher<Partition, ?>
fetcher,
+ @Nullable
HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext) {
super(
- new org.apache.flink.core.fs.Path[1],
- new
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
- DEFAULT_SPLIT_ASSIGNER,
- createBulkFormat(new JobConf(jobConf),
catalogTable, hiveVersion, producedRowType, useMapRedReader, limit),
- null);
- Preconditions.checkArgument(!isStreamingSource, "HiveSource
currently only supports bounded mode");
+ inputPaths,
+ fileEnumerator,
+ splitAssigner,
+ readerFormat,
+ continuousEnumerationSettings);
+ this.jobConfWrapper = new JobConfWrapper(jobConf);
+ this.tablePath = tablePath;
+ this.partitionKeys = partitionKeys;
+ this.fetcher = fetcher;
+ this.fetcherContext = fetcherContext;
}
@Override
@@ -69,24 +96,140 @@ public class HiveSource extends
AbstractFileSource<RowData, HiveSourceSplit> imp
return HiveSourceSplitSerializer.INSTANCE;
}
- private static BulkFormat<RowData, HiveSourceSplit> createBulkFormat(
- JobConf jobConf,
- CatalogTable catalogTable,
- String hiveVersion,
- RowType producedRowType,
- boolean useMapRedReader,
- Long limit) {
- checkNotNull(catalogTable, "catalogTable can not be null.");
- return LimitableBulkFormat.create(
- new HiveBulkFormatAdapter(
- new JobConfWrapper(jobConf),
- catalogTable.getPartitionKeys(),
-
catalogTable.getSchema().getFieldNames(),
-
catalogTable.getSchema().getFieldDataTypes(),
- hiveVersion,
- producedRowType,
- useMapRedReader),
- limit
+ @Override
+ public
SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>>
getEnumeratorCheckpointSerializer() {
+ if (continuousPartitionedEnumerator()) {
+ return new
ContinuousHivePendingSplitsCheckpointSerializer(getSplitSerializer());
+ } else {
+ return super.getEnumeratorCheckpointSerializer();
+ }
+ }
+
+ @Override
+ public SplitEnumerator<HiveSourceSplit,
PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(
+ SplitEnumeratorContext<HiveSourceSplit> enumContext) {
+ if (continuousPartitionedEnumerator()) {
+ return createContinuousSplitEnumerator(
+ enumContext,
fetcherContext.getConsumeStartOffset(), Collections.emptyList(),
Collections.emptyList());
+ } else {
+ return super.createEnumerator(enumContext);
+ }
+ }
+
+ @Override
+ public SplitEnumerator<HiveSourceSplit,
PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(
+ SplitEnumeratorContext<HiveSourceSplit> enumContext,
PendingSplitsCheckpoint<HiveSourceSplit> checkpoint) {
+ if (continuousPartitionedEnumerator()) {
+ Preconditions.checkState(checkpoint instanceof
ContinuousHivePendingSplitsCheckpoint,
+ "Illegal type of splits checkpoint %s
for streaming read partitioned table", checkpoint.getClass().getName());
+ ContinuousHivePendingSplitsCheckpoint hiveCheckpoint =
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+ return createContinuousSplitEnumerator(
+ enumContext,
hiveCheckpoint.getCurrentReadOffset(),
hiveCheckpoint.getSeenPartitionsSinceOffset(), hiveCheckpoint.getSplits());
+ } else {
+ return super.restoreEnumerator(enumContext, checkpoint);
+ }
+ }
+
+ private boolean continuousPartitionedEnumerator() {
+ return getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED &&
!partitionKeys.isEmpty();
+ }
+
+ private SplitEnumerator<HiveSourceSplit,
PendingSplitsCheckpoint<HiveSourceSplit>> createContinuousSplitEnumerator(
+ SplitEnumeratorContext<HiveSourceSplit> enumContext,
+ Comparable<?> currentReadOffset,
+ Collection<List<String>> seenPartitions,
+ Collection<HiveSourceSplit> splits) {
+ return new ContinuousHiveSplitEnumerator(
+ enumContext,
+ currentReadOffset,
+ seenPartitions,
+ getAssignerFactory().create(new
ArrayList<>(splits)),
+
getContinuousEnumerationSettings().getDiscoveryInterval().toMillis(),
+ jobConfWrapper.conf(),
+ tablePath,
+ fetcher,
+ fetcherContext
);
}
+
+ /**
+ * Builder to build HiveSource instances.
+ */
+ public static class HiveSourceBuilder extends
AbstractFileSourceBuilder<RowData, HiveSourceSplit, HiveSourceBuilder> {
+
+ private final JobConf jobConf;
+ private final ObjectPath tablePath;
+ private final List<String> partitionKeys;
+
+ private ContinuousPartitionFetcher<Partition, ?> fetcher = null;
+ private
HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext = null;
+
+ HiveSourceBuilder(
+ JobConf jobConf,
+ ObjectPath tablePath,
+ CatalogTable catalogTable,
+ List<HiveTablePartition> partitions,
+ @Nullable Long limit,
+ String hiveVersion,
+ boolean useMapRedReader,
+ RowType producedRowType) {
+ super(
+ new Path[1],
+ createBulkFormat(new JobConf(jobConf),
catalogTable, hiveVersion, producedRowType, useMapRedReader, limit),
+ new
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
+ null);
+ this.jobConf = jobConf;
+ this.tablePath = tablePath;
+ this.partitionKeys = catalogTable.getPartitionKeys();
+ }
+
+ @Override
+ public HiveSource build() {
+ FileSplitAssigner.Provider splitAssigner =
continuousSourceSettings == null || partitionKeys.isEmpty() ?
+ DEFAULT_SPLIT_ASSIGNER :
SimpleSplitAssigner::new;
+ return new HiveSource(
+ inputPaths,
+ fileEnumerator,
+ splitAssigner,
+ readerFormat,
+ continuousSourceSettings,
+ jobConf,
+ tablePath,
+ partitionKeys,
+ fetcher,
+ fetcherContext
+ );
+ }
+
+ public HiveSourceBuilder
setFetcher(ContinuousPartitionFetcher<Partition, ?> fetcher) {
+ this.fetcher = fetcher;
+ return this;
+ }
+
+ public HiveSourceBuilder
setFetcherContext(HiveTableSource.HiveContinuousPartitionFetcherContext<?>
fetcherContext) {
+ this.fetcherContext = fetcherContext;
+ return this;
+ }
+
+ private static BulkFormat<RowData, HiveSourceSplit>
createBulkFormat(
+ JobConf jobConf,
+ CatalogTable catalogTable,
+ String hiveVersion,
+ RowType producedRowType,
+ boolean useMapRedReader,
+ Long limit) {
+ checkNotNull(catalogTable, "catalogTable can not be
null.");
+ return LimitableBulkFormat.create(
+ new HiveBulkFormatAdapter(
+ new
JobConfWrapper(jobConf),
+
catalogTable.getPartitionKeys(),
+
catalogTable.getSchema().getFieldNames(),
+
catalogTable.getSchema().getFieldDataTypes(),
+ hiveVersion,
+ producedRowType,
+ useMapRedReader),
+ limit
+ );
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index ec127e5..f158625 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -20,27 +20,18 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
-import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
-import org.apache.flink.connectors.hive.read.TimestampedHiveInputSplit;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -55,9 +46,7 @@ import
org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
-import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
@@ -77,11 +66,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import static
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
import static
org.apache.flink.table.catalog.hive.util.HiveTableUtil.checkAcidTable;
-import static
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toLocalDateTime;
import static
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
import static
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
import static
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
@@ -151,23 +138,62 @@ public class HiveTableSource implements
catalogTable,
hiveShim,
remainingPartitions);
+ Configuration configuration =
Configuration.fromMap(catalogTable.getOptions());
- @SuppressWarnings("unchecked")
- TypeInformation<RowData> typeInfo =
- (TypeInformation<RowData>)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
-
- HiveTableInputFormat inputFormat = getInputFormat(
+ HiveSource.HiveSourceBuilder sourceBuilder = new
HiveSource.HiveSourceBuilder(
+ jobConf,
+ tablePath,
+ catalogTable,
allHivePartitions,
-
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
-
+ limit,
+ hiveVersion,
+
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
+ (RowType)
getProducedDataType().getLogicalType());
if (isStreamingSource()) {
if (catalogTable.getPartitionKeys().isEmpty()) {
- return
createStreamSourceForNonPartitionTable(execEnv, typeInfo, inputFormat,
allHivePartitions.get(0));
- } else {
- return
createStreamSourceForPartitionTable(execEnv, typeInfo, inputFormat);
+ String consumeOrderStr =
configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+ ConsumeOrder consumeOrder =
ConsumeOrder.getConsumeOrder(consumeOrderStr);
+ if (consumeOrder !=
ConsumeOrder.CREATE_TIME_ORDER) {
+ throw new UnsupportedOperationException(
+ "Only " +
ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
+ }
}
+
+ Duration monitorInterval =
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+ ? DEFAULT_SCAN_MONITOR_INTERVAL
+ :
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+ sourceBuilder.monitorContinuously(monitorInterval);
+
+ if (!catalogTable.getPartitionKeys().isEmpty()) {
+ sourceBuilder.setFetcher(new
HiveContinuousPartitionFetcher());
+ final String defaultPartitionName =
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+ HiveContinuousPartitionFetcherContext<?>
fetcherContext = new HiveContinuousPartitionFetcherContext(
+ tablePath,
+ hiveShim,
+ new JobConfWrapper(jobConf),
+ catalogTable.getPartitionKeys(),
+
getProducedTableSchema().getFieldDataTypes(),
+
getProducedTableSchema().getFieldNames(),
+ configuration,
+ defaultPartitionName);
+ sourceBuilder.setFetcherContext(fetcherContext);
+ }
+ }
+
+ HiveSource hiveSource = sourceBuilder.build();
+ DataStreamSource<RowData> source = execEnv.fromSource(
+ hiveSource, WatermarkStrategy.noWatermarks(),
"HiveSource-" + tablePath.getFullName());
+
+ if (isStreamingSource()) {
+ return source;
} else {
- return createBatchSource(execEnv, allHivePartitions);
+ int parallelism = new
HiveParallelismInference(tablePath, flinkConf)
+ .infer(
+ () ->
HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
+ () ->
HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions,
jobConf).size())
+ .limit(limit);
+ return source.setParallelism(parallelism);
}
}
@@ -189,119 +215,6 @@ public class HiveTableSource implements
STREAMING_SOURCE_ENABLE.defaultValue().toString()));
}
- private DataStream<RowData>
createBatchSource(StreamExecutionEnvironment execEnv,
- List<HiveTablePartition> allHivePartitions) {
- HiveSource hiveSource = new HiveSource(
- jobConf,
- catalogTable,
- allHivePartitions,
- limit,
- hiveVersion,
-
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
- isStreamingSource(),
- (RowType)
getProducedDataType().getLogicalType());
- DataStreamSource<RowData> source = execEnv.fromSource(
- hiveSource, WatermarkStrategy.noWatermarks(),
"HiveSource-" + tablePath.getFullName());
-
- int parallelism = new HiveParallelismInference(tablePath,
flinkConf)
- .infer(
- () ->
HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
- () ->
HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions,
jobConf).size())
- .limit(limit);
-
- return source.setParallelism(parallelism);
- }
-
- @SuppressWarnings("unchecked")
- private DataStream<RowData> createStreamSourceForPartitionTable(
- StreamExecutionEnvironment execEnv,
- TypeInformation<RowData> typeInfo,
- HiveTableInputFormat inputFormat) {
- Configuration configuration = new Configuration();
- catalogTable.getOptions().forEach(configuration::setString);
-
- Duration monitorInterval =
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
- ? DEFAULT_SCAN_MONITOR_INTERVAL
- :
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
-
- ContinuousPartitionFetcher<Partition, Comparable> fetcher = new
HiveContinuousPartitionFetcher();
-
- final String defaultPartitionName =
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
-
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
- HiveContinuousPartitionFetcherContext fetcherContext = new
HiveContinuousPartitionFetcherContext(
- tablePath,
- hiveShim,
- new JobConfWrapper(jobConf),
- catalogTable.getPartitionKeys(),
- getProducedTableSchema().getFieldDataTypes(),
- getProducedTableSchema().getFieldNames(),
- configuration,
- defaultPartitionName);
-
- HiveContinuousMonitoringFunction monitoringFunction = new
HiveContinuousMonitoringFunction(
- fetcher,
- fetcherContext,
- jobConf,
- tablePath,
- execEnv.getParallelism(),
- monitorInterval.toMillis());
-
- ContinuousFileReaderOperatorFactory<RowData,
TimestampedHiveInputSplit> factory =
- new
ContinuousFileReaderOperatorFactory<>(inputFormat);
-
- String sourceName = "HiveMonitoringFunction";
- SingleOutputStreamOperator<RowData> source = execEnv
- .addSource(monitoringFunction, sourceName)
- .transform("Split Reader: " + sourceName,
typeInfo, factory);
-
- return new DataStreamSource<>(source);
- }
-
- private DataStream<RowData> createStreamSourceForNonPartitionTable(
- StreamExecutionEnvironment execEnv,
- TypeInformation<RowData> typeInfo,
- HiveTableInputFormat inputFormat,
- HiveTablePartition hiveTable) {
- HiveTableFileInputFormat fileInputFormat = new
HiveTableFileInputFormat(inputFormat, hiveTable);
-
- Configuration configuration = new Configuration();
- catalogTable.getOptions().forEach(configuration::setString);
- String consumeOrderStr =
configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
- ConsumeOrder consumeOrder =
ConsumeOrder.getConsumeOrder(consumeOrderStr);
- if (consumeOrder != ConsumeOrder.CREATE_TIME_ORDER) {
- throw new UnsupportedOperationException(
- "Only " +
ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
- }
-
- String consumeOffset =
configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET);
- long currentReadTime = 0L;
- if
(configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) {
- currentReadTime =
TimestampData.fromLocalDateTime(toLocalDateTime(consumeOffset))
- .toTimestamp().getTime();
- }
-
- Duration monitorInterval =
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
- ? DEFAULT_SCAN_MONITOR_INTERVAL
- :
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
-
- ContinuousFileMonitoringFunction<RowData> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(
- fileInputFormat,
-
FileProcessingMode.PROCESS_CONTINUOUSLY,
- execEnv.getParallelism(),
- monitorInterval.toMillis(),
- currentReadTime);
-
- ContinuousFileReaderOperatorFactory<RowData,
TimestampedFileInputSplit> factory =
- new
ContinuousFileReaderOperatorFactory<>(fileInputFormat);
-
- String sourceName = "HiveFileMonitoringFunction";
- SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, sourceName)
- .transform("Split Reader: " + sourceName,
typeInfo, factory);
-
- return new DataStreamSource<>(source);
- }
-
@VisibleForTesting
HiveTableInputFormat getInputFormat(List<HiveTablePartition>
allHivePartitions, boolean useMapRedReader) {
return new HiveTableInputFormat(
@@ -365,12 +278,6 @@ public class HiveTableSource implements
this.projectedFields =
Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
}
- private static List<String> partitionSpecToValues(Map<String, String>
spec, List<String> partitionColNames) {
- Preconditions.checkArgument(spec.size() ==
partitionColNames.size() && spec.keySet().containsAll(partitionColNames),
- "Partition spec (%s) and partition column names
(%s) doesn't match", spec, partitionColNames);
- return
partitionColNames.stream().map(spec::get).collect(Collectors.toList());
- }
-
@Override
public String asSummaryString() {
return "HiveSource";
@@ -394,7 +301,7 @@ public class HiveTableSource implements
* PartitionFetcher.Context for {@link ContinuousPartitionFetcher}.
*/
@SuppressWarnings("unchecked")
- public static class HiveContinuousPartitionFetcherContext<T extends
Comparable> extends HivePartitionFetcherContextBase<Partition>
+ public static class HiveContinuousPartitionFetcherContext<T extends
Comparable<T>> extends HivePartitionFetcherContextBase<Partition>
implements
ContinuousPartitionFetcher.Context<Partition, T> {
private static final long serialVersionUID = 1L;
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
deleted file mode 100644
index 5abb67a..0000000
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.hive.read;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import
org.apache.flink.connectors.hive.HiveTableSource.HiveContinuousPartitionFetcherContext;
-import org.apache.flink.connectors.hive.JobConfWrapper;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This is the single (non-parallel) monitoring task which takes a {@link
HiveTableInputFormat},
- * it is responsible for:
- *
- * <ol>
- * <li>Monitoring partitions of hive meta store.</li>
- * <li>Deciding which partitions should be further read and processed.</li>
- * <li>Creating the {@link HiveTableInputSplit splits} corresponding to
those partitions.</li>
- * <li>Assigning them to downstream tasks for further processing.</li>
- * </ol>
- *
- * <p>The splits to be read are forwarded to the downstream {@link
ContinuousFileReaderOperator}
- * which can have parallelism greater than one.
- *
- * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in
ascending partition time order,
- * based on the partition time of the partitions they belong to.
- *
- * @param <T> The serializer type of partition offset.
- */
-public class HiveContinuousMonitoringFunction<T extends Comparable>
- extends RichSourceFunction<TimestampedHiveInputSplit>
- implements CheckpointedFunction {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG =
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
-
- private final ContinuousPartitionFetcher<Partition, T> fetcher;
- private final HiveContinuousPartitionFetcherContext<T> fetcherContext;
-
- /** The parallelism of the downstream readers. */
- private final int readerParallelism;
-
- /** The interval between consecutive path scans. */
- private final long interval;
- private ObjectPath tablePath;
- private final JobConfWrapper conf;
-
- private volatile boolean isRunning = true;
-
- /** The maximum partition read offset seen so far. */
- private volatile T currentReadOffset;
-
- private transient Object checkpointLock;
-
- private transient ListState<T> currReadOffsetState;
-
- private transient ListState<List<List<String>>> distinctPartsState;
-
- private transient Set<List<String>> distinctPartitions;
-
- public HiveContinuousMonitoringFunction(
- ContinuousPartitionFetcher<Partition, T> fetcher,
- HiveContinuousPartitionFetcherContext<T> fetcherContext,
- JobConf conf,
- ObjectPath tablePath,
- int readerParallelism,
- long interval) {
- this.conf = new JobConfWrapper(conf);
- this.tablePath = tablePath;
- this.interval = interval;
- this.readerParallelism = Math.max(readerParallelism, 1);
- this.fetcher = fetcher;
- this.fetcherContext = fetcherContext;
- this.currentReadOffset = fetcherContext.getConsumeStartOffset();
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context)
throws Exception {
- this.fetcherContext.open();
-
- this.currReadOffsetState =
context.getOperatorStateStore().getListState(
- new ListStateDescriptor<>(
- "current-read-offset-state",
- fetcherContext.getTypeSerializer()
- )
- );
- this.distinctPartsState =
context.getOperatorStateStore().getListState(
- new ListStateDescriptor<>(
- "distinct-partitions-state",
- new ListSerializer<>(new
ListSerializer<>(StringSerializer.INSTANCE))
- )
- );
-
- this.distinctPartitions = new HashSet<>();
- if (context.isRestored()) {
- LOG.info("Restoring state for the {}.",
getClass().getSimpleName());
- this.currentReadOffset =
this.currReadOffsetState.get().iterator().next();
-
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
- } else {
- LOG.info("No state to restore for the {}.",
getClass().getSimpleName());
- this.currentReadOffset =
fetcherContext.getConsumeStartOffset();
- }
- }
-
- @Override
- public void run(SourceContext<TimestampedHiveInputSplit> context)
throws Exception {
- checkpointLock = context.getCheckpointLock();
- while (isRunning) {
- synchronized (checkpointLock) {
- if (isRunning) {
- monitorAndForwardSplits(context);
- }
- }
- Thread.sleep(interval);
- }
- }
-
- private void monitorAndForwardSplits(
- SourceContext<TimestampedHiveInputSplit> context)
throws Exception {
- assert (Thread.holdsLock(checkpointLock));
-
- List<Tuple2<Partition, T>> partitions =
fetcher.fetchPartitions(this.fetcherContext, currentReadOffset);
- if (partitions.isEmpty()) {
- return;
- }
-
- partitions.sort(Comparator.comparing(o -> o.f1));
-
- T maxPartitionOffset = null;
- Set<List<String>> nextDistinctParts = new HashSet<>();
- for (Tuple2<Partition, T> tuple2 : partitions) {
- Partition partition = tuple2.f0;
- List<String> partSpec = partition.getValues();
- if (!this.distinctPartitions.contains(partSpec)) {
- this.distinctPartitions.add(partSpec);
- T partitionOffset = tuple2.f1;
- if
(partitionOffset.compareTo(currentReadOffset) > 0) {
- nextDistinctParts.add(partSpec);
- }
- if (maxPartitionOffset == null ||
partitionOffset.compareTo(maxPartitionOffset) > 0) {
- maxPartitionOffset = partitionOffset;
- }
- LOG.info("Found new partition {} of table {},
forwarding splits to downstream readers",
- partSpec,
tablePath.getFullName());
- HiveTableInputSplit[] splits =
HiveTableInputFormat.createInputSplits(
- this.readerParallelism,
-
Collections.singletonList(fetcherContext.toHiveTablePartition(partition)),
- this.conf.conf());
- long modificationTime =
fetcherContext.getModificationTime(partition, tuple2.f1);
- for (HiveTableInputSplit split : splits) {
- context.collect(new
TimestampedHiveInputSplit(modificationTime, split));
- }
- }
- }
-
- if (maxPartitionOffset != null ||
maxPartitionOffset.compareTo(currentReadOffset) > 0) {
- currentReadOffset = maxPartitionOffset;
- distinctPartitions.clear();
- distinctPartitions.addAll(nextDistinctParts);
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- Preconditions.checkState(this.currReadOffsetState != null,
- "The " + getClass().getSimpleName() + " state
has not been properly initialized.");
-
- this.currReadOffsetState.clear();
- this.currReadOffsetState.add(this.currentReadOffset);
-
- this.distinctPartsState.clear();
- this.distinctPartsState.add(new
ArrayList<>(this.distinctPartitions));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} checkpointed {}.",
getClass().getSimpleName(), currentReadOffset);
- }
- }
-
- @Override
- public void close() {
- cancel();
- }
-
- @Override
- public void cancel() {
- // this is to cover the case where cancel() is called before
the run()
- if (checkpointLock != null) {
- synchronized (checkpointLock) {
- isRunning = false;
- }
- } else {
- isRunning = false;
- }
- try {
- if (this.fetcherContext != null) {
- this.fetcherContext.close();
- }
- } catch (Exception e) {
- throw new TableException("failed to close the
context.");
- }
- }
-}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
index 67e0d5f..1d487be 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
@@ -33,7 +33,7 @@ import java.util.List;
* A {@link ContinuousPartitionFetcher} for hive table.
*/
@Internal
-public class HiveContinuousPartitionFetcher<T extends Comparable> implements
ContinuousPartitionFetcher<Partition, T> {
+public class HiveContinuousPartitionFetcher<T extends Comparable<T>>
implements ContinuousPartitionFetcher<Partition, T> {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
index 67a2dde..921e12e 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
@@ -33,7 +33,7 @@ import java.util.List;
* or create-time order, be String when fetches in partition-name
order.
*/
@Internal
-public interface ContinuousPartitionFetcher<P, T extends Comparable> extends
PartitionFetcher<P> {
+public interface ContinuousPartitionFetcher<P, T extends Comparable<T>>
extends PartitionFetcher<P> {
/**
* Fetch partitions by previous partition offset (Including).
@@ -47,7 +47,7 @@ public interface ContinuousPartitionFetcher<P, T extends
Comparable> extends Par
* @param <T> The type of partition offset, the type could be Long when
fetches in partition-time
* or create-time order, be String when fetches in
partition-name order.
*/
- interface Context<P, T extends Comparable> extends
PartitionFetcher.Context<P> {
+ interface Context<P, T extends Comparable<T>> extends
PartitionFetcher.Context<P> {
/**
* The table full path.
*/
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
index 6e322de..4b94af0 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
@@ -73,7 +73,7 @@ public interface PartitionFetcher<P> extends Serializable {
* @param <P> The type of partition value.
* @param <T> The tye of the partition offset, the partition
offset is comparable.
*/
- interface ComparablePartitionValue<P, T extends Comparable>
extends Serializable {
+ interface ComparablePartitionValue<P, T extends Comparable<T>>
extends Serializable {
/**
* Get the partition value.