This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d4340d16708 [FLINK-30491][hive] Hive table partition supports lazy
deserialization during runtime
d4340d16708 is described below
commit d4340d16708010394d7c57063b5dece8362d41d0
Author: fengli <[email protected]>
AuthorDate: Sat Dec 24 16:46:03 2022 +0800
[FLINK-30491][hive] Hive table partition supports lazy deserialization
during runtime
This closes #21556
---
.../apache/flink/connectors/hive/HiveSource.java | 8 +--
.../flink/connectors/hive/HiveSourceBuilder.java | 13 +++-
.../hive/HiveSourceDynamicFileEnumerator.java | 10 ++--
.../connectors/hive/HiveSourceFileEnumerator.java | 13 ++--
.../hive/HiveTablePartitionSerializer.java | 68 +++++++++++++++++++++
.../connectors/hive/util/HivePartitionUtils.java | 30 ++++++++++
.../hive/util/HivePartitionUtilsTest.java | 70 ++++++++++++++++++++++
7 files changed, 197 insertions(+), 15 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index d340ed616ea..5b768b0320a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -64,7 +64,7 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
private final String hiveVersion;
private final List<String> dynamicFilterPartitionKeys;
- private final List<HiveTablePartition> partitions;
+ private final List<byte[]> partitionBytes;
private final ContinuousPartitionFetcher<Partition, ?> fetcher;
private final HiveTableSource.HiveContinuousPartitionFetcherContext<?>
fetcherContext;
private final ObjectPath tablePath;
@@ -80,7 +80,7 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
List<String> partitionKeys,
String hiveVersion,
@Nullable List<String> dynamicFilterPartitionKeys,
- List<HiveTablePartition> partitions,
+ List<byte[]> partitionBytes,
@Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,
@Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?>
fetcherContext) {
super(
@@ -94,7 +94,7 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
this.partitionKeys = partitionKeys;
this.hiveVersion = hiveVersion;
this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
- this.partitions = partitions;
+ this.partitionBytes = partitionBytes;
this.fetcher = fetcher;
this.fetcherContext = fetcherContext;
}
@@ -181,7 +181,7 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
new HiveSourceDynamicFileEnumerator.Provider(
tablePath.getFullName(),
dynamicFilterPartitionKeys,
- partitions,
+ partitionBytes,
hiveVersion,
jobConfWrapper),
getAssignerFactory());
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index bb6ee5a6c1e..86de17520a9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -234,11 +234,18 @@ public class HiveSourceBuilder {
continuousSourceSettings == null || partitionKeys.isEmpty()
? DEFAULT_SPLIT_ASSIGNER
: SimpleSplitAssigner::new;
+ List<byte[]> hiveTablePartitionBytes = Collections.emptyList();
+ if (partitions != null) {
+ // Serializing the HiveTablePartition list manually at compile
time to avoid
+ // deserializing it in TaskManager during runtime. The
HiveTablePartition list is no
+ // need for TM.
+ hiveTablePartitionBytes =
HivePartitionUtils.serializeHiveTablePartition(partitions);
+ }
+
return new HiveSource<>(
new Path[1],
new HiveSourceFileEnumerator.Provider(
- partitions != null ? partitions :
Collections.emptyList(),
- new JobConfWrapper(jobConf)),
+ hiveTablePartitionBytes, new JobConfWrapper(jobConf)),
splitAssigner,
bulkFormat,
continuousSourceSettings,
@@ -247,7 +254,7 @@ public class HiveSourceBuilder {
partitionKeys,
hiveVersion,
dynamicFilterPartitionKeys,
- partitions,
+ hiveTablePartitionBytes,
fetcher,
fetcherContext);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
index 530e0f61e52..4d84fd5e048 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
@@ -189,19 +189,21 @@ public class HiveSourceDynamicFileEnumerator implements
DynamicFileEnumerator {
private final String table;
private final List<String> dynamicFilterPartitionKeys;
- private final List<HiveTablePartition> partitions;
+ // The binary HiveTablePartition list, serialize it manually at
compile time to avoid
+ // deserializing it in TaskManager during runtime.
+ private final List<byte[]> partitionBytes;
private final String hiveVersion;
private final JobConfWrapper jobConfWrapper;
public Provider(
String table,
List<String> dynamicFilterPartitionKeys,
- List<HiveTablePartition> partitions,
+ List<byte[]> partitionBytes,
String hiveVersion,
JobConfWrapper jobConfWrapper) {
this.table = checkNotNull(table);
this.dynamicFilterPartitionKeys =
checkNotNull(dynamicFilterPartitionKeys);
- this.partitions = checkNotNull(partitions);
+ this.partitionBytes = checkNotNull(partitionBytes);
this.hiveVersion = checkNotNull(hiveVersion);
this.jobConfWrapper = checkNotNull(jobConfWrapper);
}
@@ -211,7 +213,7 @@ public class HiveSourceDynamicFileEnumerator implements
DynamicFileEnumerator {
return new HiveSourceDynamicFileEnumerator(
table,
dynamicFilterPartitionKeys,
- partitions,
+
HivePartitionUtils.deserializeHiveTablePartition(partitionBytes),
hiveVersion,
jobConfWrapper.conf());
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index 2373e747148..c30cd1c8e1b 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
@@ -224,17 +225,21 @@ public class HiveSourceFileEnumerator implements
FileEnumerator {
private static final long serialVersionUID = 1L;
- private final List<HiveTablePartition> partitions;
+ // The binary HiveTablePartition list, serialize it manually at
compile time to avoid
+ // deserializing it in TaskManager during runtime.
+ private final List<byte[]> partitionBytes;
private final JobConfWrapper jobConfWrapper;
- public Provider(List<HiveTablePartition> partitions, JobConfWrapper
jobConfWrapper) {
- this.partitions = partitions;
+ public Provider(List<byte[]> partitionBytes, JobConfWrapper
jobConfWrapper) {
+ this.partitionBytes = partitionBytes;
this.jobConfWrapper = jobConfWrapper;
}
@Override
public FileEnumerator create() {
- return new HiveSourceFileEnumerator(partitions,
jobConfWrapper.conf());
+ return new HiveSourceFileEnumerator(
+
HivePartitionUtils.deserializeHiveTablePartition(partitionBytes),
+ jobConfWrapper.conf());
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartitionSerializer.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartitionSerializer.java
new file mode 100644
index 00000000000..57875bda400
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartitionSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** SerDe for {@link HiveTablePartition}. */
+public class HiveTablePartitionSerializer implements
SimpleVersionedSerializer<HiveTablePartition> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ public static final HiveTablePartitionSerializer INSTANCE = new
HiveTablePartitionSerializer();
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(HiveTablePartition hiveTablePartition) throws
IOException {
+ checkArgument(
+ hiveTablePartition.getClass() == HiveTablePartition.class,
+ "Cannot serialize subclasses of HiveTablePartition");
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ try (ObjectOutputStream outputStream = new
ObjectOutputStream(byteArrayOutputStream)) {
+ outputStream.writeObject(hiveTablePartition);
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @Override
+ public HiveTablePartition deserialize(int version, byte[] serialized)
throws IOException {
+ if (version == CURRENT_VERSION) {
+ try (ObjectInputStream inputStream =
+ new ObjectInputStream(new
ByteArrayInputStream(serialized))) {
+ return (HiveTablePartition) inputStream.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to deserialize
HiveTablePartition", e);
+ }
+ } else {
+ throw new IOException("Unknown version: " + version);
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
index 6d4d83453d3..443f4fea1c8 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive.util;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTablePartitionSerializer;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -290,4 +291,33 @@ public class HivePartitionUtils {
}
}
}
+
+ public static List<byte[]> serializeHiveTablePartition(
+ List<HiveTablePartition> hiveTablePartitions) {
+ List<byte[]> partitionBytes = new
ArrayList<>(hiveTablePartitions.size());
+ try {
+ for (HiveTablePartition hiveTablePartition : hiveTablePartitions) {
+ partitionBytes.add(
+
HiveTablePartitionSerializer.INSTANCE.serialize(hiveTablePartition));
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ return partitionBytes;
+ }
+
+ public static List<HiveTablePartition> deserializeHiveTablePartition(
+ List<byte[]> partitionBytes) {
+ List<HiveTablePartition> hiveTablePartitions = new
ArrayList<>(partitionBytes.size());
+ try {
+ for (byte[] bytes : partitionBytes) {
+ hiveTablePartitions.add(
+ HiveTablePartitionSerializer.INSTANCE.deserialize(
+
HiveTablePartitionSerializer.INSTANCE.getVersion(), bytes));
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ return hiveTablePartitions;
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HivePartitionUtilsTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HivePartitionUtilsTest.java
new file mode 100644
index 00000000000..a57dcb7be2c
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HivePartitionUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.flink.connectors.hive.HiveTablePartition;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HivePartitionUtils}. */
+public class HivePartitionUtilsTest {
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testHiveTablePartitionSerDe() throws Exception {
+ String baseFilePath =
+
Objects.requireNonNull(this.getClass().getResource("/orc/test.orc")).getPath();
+ File wareHouse =
temporaryFolder.newFolder("testHiveTablePartitionSerDe");
+ int partitionNum = 10;
+ List<HiveTablePartition> expectedHiveTablePartitions = new
ArrayList<>();
+ for (int i = 0; i < partitionNum; i++) {
+ // create partition directory
+ Path partitionPath = Paths.get(wareHouse.getPath(), "p_" + i);
+ Files.createDirectory(partitionPath);
+ // copy file to the partition directory
+ Files.copy(Paths.get(baseFilePath),
Paths.get(partitionPath.toString(), "t.orc"));
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setLocation(partitionPath.toString());
+ expectedHiveTablePartitions.add(new HiveTablePartition(sd, new
Properties()));
+ }
+
+ List<byte[]> hiveTablePartitionBytes =
+
HivePartitionUtils.serializeHiveTablePartition(expectedHiveTablePartitions);
+
+ List<HiveTablePartition> actualHiveTablePartitions =
+
HivePartitionUtils.deserializeHiveTablePartition(hiveTablePartitionBytes);
+
+
assertThat(actualHiveTablePartitions).isEqualTo(expectedHiveTablePartitions);
+ }
+}