This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 5983867 [FLINK-20241][hive] Improve exception message when hive deps
are missing on JM/TM
5983867 is described below
commit 598386763115cb9fa3de29a5e70ec9fd0e0faf57
Author: Rui Li <[email protected]>
AuthorDate: Fri Nov 27 10:42:43 2020 +0800
[FLINK-20241][hive] Improve exception message when hive deps are missing on
JM/TM
This closes #14203
---
.../connectors/hive/CachedSerializedValue.java | 49 +++++
.../flink/connectors/hive/HiveTablePartition.java | 17 +-
.../flink/connectors/hive/JobConfWrapper.java | 27 ++-
.../hive/read/HiveCompactReaderFactory.java | 16 +-
.../connectors/hive/read/HiveTableInputFormat.java | 98 ++-------
.../hive/read/TimestampedHiveInputSplit.java | 237 ---------------------
.../connectors/hive/write/HiveWriterFactory.java | 12 +-
.../hive/HiveDeserializeExceptionTest.java | 110 ++++++++++
8 files changed, 233 insertions(+), 333 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/CachedSerializedValue.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/CachedSerializedValue.java
new file mode 100644
index 0000000..41d70d8
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/CachedSerializedValue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+
+/**
+ * An extension of SerializedValue which caches the deserialized data.
+ */
+public class CachedSerializedValue<T> extends SerializedValue<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient T deserialized;
+
+ public CachedSerializedValue(T value) throws IOException {
+ super(value);
+ }
+
+ @Override
+ public T deserializeValue(ClassLoader loader) throws IOException,
ClassNotFoundException {
+ if (deserialized == null) {
+ deserialized = super.deserializeValue(loader);
+ }
+ return deserialized;
+ }
+
+ public T deserializeValue() throws IOException, ClassNotFoundException {
+ return
deserializeValue(Thread.currentThread().getContextClassLoader());
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
index b670662..e5619b6 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -37,7 +38,7 @@ public class HiveTablePartition implements Serializable {
private static final long serialVersionUID = 4145470177119940673L;
/** Partition storage descriptor. */
- private final StorageDescriptor storageDescriptor;
+ private final CachedSerializedValue<StorageDescriptor>
storageDescriptor;
/** The map of partition key names and their values. */
private final Map<String, Object> partitionSpec;
@@ -50,13 +51,21 @@ public class HiveTablePartition implements Serializable {
}
public HiveTablePartition(StorageDescriptor storageDescriptor,
Map<String, Object> partitionSpec, Properties tableProps) {
- this.storageDescriptor = checkNotNull(storageDescriptor,
"storageDescriptor can not be null");
+ try {
+ this.storageDescriptor = new
CachedSerializedValue<>(checkNotNull(storageDescriptor, "storageDescriptor can
not be null"));
+ } catch (IOException e) {
+ throw new FlinkHiveException("Failed to serialize
StorageDescriptor", e);
+ }
this.partitionSpec = checkNotNull(partitionSpec, "partitionSpec
can not be null");
this.tableProps = checkNotNull(tableProps, "tableProps can not
be null");
}
public StorageDescriptor getStorageDescriptor() {
- return storageDescriptor;
+ try {
+ return storageDescriptor.deserializeValue();
+ } catch (IOException | ClassNotFoundException e) {
+ throw new FlinkHiveException("Failed to deserialize
StorageDescriptor", e);
+ }
}
public Map<String, Object> getPartitionSpec() {
@@ -89,7 +98,7 @@ public class HiveTablePartition implements Serializable {
@Override
public String toString() {
return "HiveTablePartition{" +
- "storageDescriptor=" + storageDescriptor +
+ "storageDescriptor=" + getStorageDescriptor() +
", partitionSpec=" + partitionSpec +
", tableProps=" + tableProps +
'}';
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
index 505c555..13c1693 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
@@ -18,6 +18,8 @@
package org.apache.flink.connectors.hive;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.Credentials;
@@ -35,7 +37,7 @@ public class JobConfWrapper implements Serializable {
private static final long serialVersionUID = 1L;
- private JobConf jobConf;
+ private transient JobConf jobConf;
public JobConfWrapper(JobConf jobConf) {
this.jobConf = jobConf;
@@ -46,14 +48,29 @@ public class JobConfWrapper implements Serializable {
}
private void writeObject(ObjectOutputStream out) throws IOException {
- jobConf.write(out);
+ out.defaultWriteObject();
+
+ // we write the jobConf through a separate serializer to avoid
cryptic exceptions when it
+ // corrupts the serialization stream
+ final DataOutputSerializer ser = new DataOutputSerializer(256);
+ jobConf.write(ser);
+ out.writeInt(ser.length());
+ out.write(ser.getSharedBuffer(), 0, ser.length());
}
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- if (jobConf == null) {
- jobConf = new JobConf();
+ in.defaultReadObject();
+
+ final byte[] data = new byte[in.readInt()];
+ in.readFully(data);
+ final DataInputDeserializer deser = new
DataInputDeserializer(data);
+ this.jobConf = new JobConf();
+ try {
+ jobConf.readFields(deser);
+ } catch (IOException e) {
+ throw new IOException(
+ "Could not deserialize JobConf, the
serialized and de-serialized don't match.", e);
}
- jobConf.readFields(in);
Credentials currentUserCreds =
HadoopInputFormatCommonBase.getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if (currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
index 7cf3d8a..eeb13fc 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
@@ -19,6 +19,8 @@
package org.apache.flink.connectors.hive.read;
import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connectors.hive.CachedSerializedValue;
+import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.core.fs.FileSystem;
@@ -53,7 +55,7 @@ public class HiveCompactReaderFactory implements
CompactReader.Factory<RowData>
private static final long serialVersionUID = 1L;
- private final StorageDescriptor sd;
+ private final CachedSerializedValue<StorageDescriptor> sd;
private final Properties properties;
private final JobConfWrapper jobConfWrapper;
private final List<String> partitionKeys;
@@ -72,7 +74,11 @@ public class HiveCompactReaderFactory implements
CompactReader.Factory<RowData>
String hiveVersion,
RowType producedRowType,
boolean useMapRedReader) {
- this.sd = sd;
+ try {
+ this.sd = new CachedSerializedValue<>(sd);
+ } catch (IOException e) {
+ throw new FlinkHiveException("Failed to serialize
StorageDescriptor", e);
+ }
this.properties = properties;
this.jobConfWrapper = new JobConfWrapper(jobConf);
this.partitionKeys = catalogTable.getPartitionKeys();
@@ -110,6 +116,10 @@ public class HiveCompactReaderFactory implements
CompactReader.Factory<RowData>
partitionSpec.put(entry.getKey(), partitionValue);
}
- return new HiveTablePartition(sd, partitionSpec, properties);
+ try {
+ return new HiveTablePartition(sd.deserializeValue(),
partitionSpec, properties);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new FlinkHiveException("Failed to deserialize
StorageDescriptor", e);
+ }
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index d28ae69..ea6249e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -25,8 +25,8 @@ import
org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.data.GenericRowData;
@@ -41,15 +41,11 @@ import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -74,26 +70,26 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
private static final String SCHEMA_EVOLUTION_COLUMNS =
"schema.evolution.columns";
private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES =
"schema.evolution.columns.types";
- private JobConf jobConf;
+ private final JobConfWrapper jobConf;
- private String hiveVersion;
+ private final String hiveVersion;
- private List<String> partitionKeys;
+ private final List<String> partitionKeys;
- private DataType[] fieldTypes;
+ private final DataType[] fieldTypes;
- private String[] fieldNames;
+ private final String[] fieldNames;
//For non-partition hive table, partitions only contains one partition
which partitionValues is empty.
- private List<HiveTablePartition> partitions;
+ private final List<HiveTablePartition> partitions;
// indices of fields to be returned, with projection applied (if any)
- private int[] selectedFields;
+ private final int[] selectedFields;
//We should limit the input read count of this splits, null represents
no limit.
- private Long limit;
+ private final Long limit;
- private boolean useMapRedReader;
+ private final boolean useMapRedReader;
private transient long currentReadCount = 0L;
@@ -102,27 +98,6 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
public HiveTableInputFormat(
JobConf jobConf,
- CatalogTable catalogTable,
- List<HiveTablePartition> partitions,
- int[] projectedFields,
- Long limit,
- String hiveVersion,
- boolean useMapRedReader) {
- this(
- jobConf,
- checkNotNull(catalogTable, "catalogTable can
not be null.")
- .getPartitionKeys(),
- catalogTable.getSchema().getFieldDataTypes(),
- catalogTable.getSchema().getFieldNames(),
- projectedFields,
- limit,
- hiveVersion,
- useMapRedReader,
- partitions);
- }
-
- public HiveTableInputFormat(
- JobConf jobConf,
List<String> partitionKeys,
DataType[] fieldTypes,
String[] fieldNames,
@@ -132,7 +107,7 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
boolean useMapRedReader,
List<HiveTablePartition> partitions) {
super(jobConf.getCredentials());
- this.jobConf = new JobConf(jobConf);
+ this.jobConf = new JobConfWrapper(new JobConf(jobConf));
this.partitionKeys = partitionKeys;
this.fieldTypes = fieldTypes;
this.fieldNames = fieldNames;
@@ -145,7 +120,7 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
}
public JobConf getJobConf() {
- return jobConf;
+ return jobConf.conf();
}
@Override
@@ -157,12 +132,12 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
HiveTablePartition partition = split.getHiveTablePartition();
if (!useMapRedReader && useOrcVectorizedRead(partition)) {
this.reader = new HiveVectorizedOrcSplitReader(
- hiveVersion, jobConf, fieldNames,
fieldTypes, selectedFields, split);
+ hiveVersion, jobConf.conf(),
fieldNames, fieldTypes, selectedFields, split);
} else if (!useMapRedReader &&
useParquetVectorizedRead(partition)) {
this.reader = new HiveVectorizedParquetSplitReader(
- hiveVersion, jobConf, fieldNames,
fieldTypes, selectedFields, split);
+ hiveVersion, jobConf.conf(),
fieldNames, fieldTypes, selectedFields, split);
} else {
- JobConf clonedConf = new JobConf(jobConf);
+ JobConf clonedConf = new JobConf(jobConf.conf());
addSchemaToConf(clonedConf);
this.reader = new HiveMapredSplitReader(clonedConf,
partitionKeys, fieldTypes, selectedFields, split,
HiveShimLoader.loadHiveShim(hiveVersion));
@@ -301,7 +276,7 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
@Override
public HiveTableInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
- return createInputSplits(minNumSplits, partitions, jobConf);
+ return createInputSplits(minNumSplits, partitions,
jobConf.conf());
}
public static HiveTableInputSplit[] createInputSplits(
@@ -358,7 +333,7 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
StorageDescriptor sd = partition.getStorageDescriptor();
Path inputPath = new Path(sd.getLocation());
if (fs == null) {
- fs = inputPath.getFileSystem(jobConf);
+ fs = inputPath.getFileSystem(jobConf.conf());
}
// it's possible a partition exists in metastore but
the data has been removed
if (!fs.exists(inputPath)) {
@@ -368,43 +343,4 @@ public class HiveTableInputFormat extends
HadoopInputFormatCommonBase<RowData, H
}
return numFiles;
}
-
- //
--------------------------------------------------------------------------------------------
- // Custom serialization methods
- //
--------------------------------------------------------------------------------------------
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- super.write(out);
- jobConf.write(out);
- out.writeObject(partitionKeys);
- out.writeObject(fieldTypes);
- out.writeObject(fieldNames);
- out.writeObject(partitions);
- out.writeObject(selectedFields);
- out.writeObject(limit);
- out.writeObject(hiveVersion);
- out.writeBoolean(useMapRedReader);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- super.read(in);
- if (jobConf == null) {
- jobConf = new JobConf();
- }
- jobConf.readFields(in);
- jobConf.getCredentials().addAll(this.credentials);
- Credentials currentUserCreds =
getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
- if (currentUserCreds != null) {
- jobConf.getCredentials().addAll(currentUserCreds);
- }
- partitionKeys = (List<String>) in.readObject();
- fieldTypes = (DataType[]) in.readObject();
- fieldNames = (String[]) in.readObject();
- partitions = (List<HiveTablePartition>) in.readObject();
- selectedFields = (int[]) in.readObject();
- limit = (Long) in.readObject();
- hiveVersion = (String) in.readObject();
- useMapRedReader = in.readBoolean();
- }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
deleted file mode 100644
index f3e46af..0000000
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.hive.read;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * A {@link HiveTableInputSplit} with {@link TimestampedInputSplit}.
- * Kryo serializer can not deal with hadoop split, need specific type
information factory.
- *
- * <p>Note: this class has a natural ordering that is inconsistent with equals.
- */
-@TypeInfo(TimestampedHiveInputSplit.SplitTypeInfoFactory.class)
-public class TimestampedHiveInputSplit extends HiveTableInputSplit implements
TimestampedInputSplit {
-
- private static final long serialVersionUID = 1L;
-
- /** The modification time of the file this split belongs to. */
- private final long modificationTime;
-
- /**
- * The state of the split. This information is used when
- * restoring from a checkpoint and allows to resume reading the
- * underlying file from the point we left off.
- * */
- private Serializable splitState;
-
- public TimestampedHiveInputSplit(
- long modificationTime,
- HiveTableInputSplit split) {
- super(
- split.getSplitNumber(),
- split.getHadoopInputSplit(),
- split.getJobConf(),
- split.getHiveTablePartition());
- this.modificationTime = modificationTime;
- }
-
- @Override
- public void setSplitState(Serializable state) {
- this.splitState = state;
- }
-
- @Override
- public Serializable getSplitState() {
- return this.splitState;
- }
-
- @Override
- public long getModificationTime() {
- return modificationTime;
- }
-
- /**
- * Note Again: this class has a natural ordering that is inconsistent
with equals.
- */
- @Override
- public int compareTo(TimestampedInputSplit o) {
- TimestampedHiveInputSplit split = (TimestampedHiveInputSplit) o;
- int modTimeComp = Long.compare(this.modificationTime,
split.modificationTime);
- if (modTimeComp != 0L) {
- return modTimeComp;
- }
-
- int sdComp =
this.hiveTablePartition.getStorageDescriptor().compareTo(
-
split.hiveTablePartition.getStorageDescriptor());
-
- return sdComp != 0 ? sdComp :
- this.getSplitNumber() - o.getSplitNumber();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- TimestampedHiveInputSplit that = (TimestampedHiveInputSplit) o;
- return modificationTime == that.modificationTime;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), modificationTime);
- }
-
- @Override
- public String toString() {
- return "TimestampedHiveInputSplit{" +
- "modificationTime=" + modificationTime +
- ", splitState=" + splitState +
- ", hiveTablePartition=" + hiveTablePartition +
- '}';
- }
-
- /**
- * {@link TypeInfoFactory} for {@link TimestampedHiveInputSplit}.
- */
- public static class SplitTypeInfoFactory
- extends TypeInfoFactory<TimestampedHiveInputSplit>
- implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public TypeInformation<TimestampedHiveInputSplit>
createTypeInfo(
- Type t, Map genericParameters) {
- return new BasicTypeInfo<TimestampedHiveInputSplit>(
- TimestampedHiveInputSplit.class,
- new Class<?>[]{},
- SplitTypeSerializer.INSTANCE,
- null) {};
- }
- }
-
- private static class SplitTypeSerializer extends
TypeSerializerSingleton<TimestampedHiveInputSplit> {
-
- private static final SplitTypeSerializer INSTANCE = new
SplitTypeSerializer();
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TimestampedHiveInputSplit createInstance() {
- return null;
- }
-
- @Override
- public TimestampedHiveInputSplit copy(TimestampedHiveInputSplit
from) {
- try {
- return InstantiationUtil.clone(from,
Thread.currentThread().getContextClassLoader());
- } catch (IOException | ClassNotFoundException e) {
- throw new FlinkRuntimeException("Could not copy
element via serialization: " + from, e);
- }
- }
-
- @Override
- public TimestampedHiveInputSplit copy(TimestampedHiveInputSplit
from, TimestampedHiveInputSplit reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(TimestampedHiveInputSplit record,
DataOutputView target) throws IOException {
- try (final DataOutputViewStream outViewWrapper = new
DataOutputViewStream(target)) {
-
InstantiationUtil.serializeObject(outViewWrapper, record);
- }
- }
-
- @Override
- public TimestampedHiveInputSplit deserialize(DataInputView
source) throws IOException {
- try (final DataInputViewStream inViewWrapper = new
DataInputViewStream(source)) {
- return InstantiationUtil.deserializeObject(
- inViewWrapper,
-
Thread.currentThread().getContextClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException("Could not deserialize
object.", e);
- }
- }
-
- @Override
- public TimestampedHiveInputSplit deserialize(
- TimestampedHiveInputSplit reuse, DataInputView
source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target)
throws IOException {
- TimestampedHiveInputSplit tmp = deserialize(source);
- serialize(tmp, target);
- }
-
- @Override
- public TypeSerializerSnapshot<TimestampedHiveInputSplit>
snapshotConfiguration() {
- return new
SplitTypeSerializer.SplitSerializerSnapshot();
- }
-
- /**
- * Serializer configuration snapshot for compatibility and
format evolution.
- */
- @SuppressWarnings("WeakerAccess")
- public static final class SplitSerializerSnapshot extends
-
SimpleTypeSerializerSnapshot<TimestampedHiveInputSplit> {
-
- public SplitSerializerSnapshot() {
- super(SplitTypeSerializer::new);
- }
- }
- }
-}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
index b0808ad..563b342 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.connectors.hive.write;
+import org.apache.flink.connectors.hive.CachedSerializedValue;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.table.api.TableSchema;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -68,7 +70,7 @@ public class HiveWriterFactory implements Serializable {
private final Class hiveOutputFormatClz;
- private final SerDeInfo serDeInfo;
+ private final CachedSerializedValue<SerDeInfo> serDeInfo;
private final String[] allColumns;
@@ -115,7 +117,11 @@ public class HiveWriterFactory implements Serializable {
"The output format should be an instance of
HiveOutputFormat");
this.confWrapper = new JobConfWrapper(jobConf);
this.hiveOutputFormatClz = hiveOutputFormatClz;
- this.serDeInfo = serDeInfo;
+ try {
+ this.serDeInfo = new CachedSerializedValue<>(serDeInfo);
+ } catch (IOException e) {
+ throw new FlinkHiveException("Failed to serialize
SerDeInfo", e);
+ }
this.allColumns = schema.getFieldNames();
this.allTypes = schema.getFieldDataTypes();
this.partitionColumns = partitionColumns;
@@ -170,7 +176,7 @@ public class HiveWriterFactory implements Serializable {
}
JobConf jobConf = confWrapper.conf();
- Object serdeLib =
Class.forName(serDeInfo.getSerializationLib()).newInstance();
+ Object serdeLib =
Class.forName(serDeInfo.deserializeValue().getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer &&
serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both
Serializer and Deserializer, but actually got "
+
serdeLib.getClass().getName());
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDeserializeExceptionTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDeserializeExceptionTest.java
new file mode 100644
index 0000000..c569f44
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDeserializeExceptionTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
+import org.apache.flink.connectors.hive.write.HiveWriterFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Sometimes users only add hive connector deps on client side but forget to
add them on JM/TM.
+ * This test is to make sure users get a clear message when that happens.
+ */
+@RunWith(Parameterized.class)
+public class HiveDeserializeExceptionTest {
+
+ @Parameterized.Parameters(name = "{1}")
+ public static Object[] parameters() {
+ HiveWriterFactory writerFactory = new HiveWriterFactory(
+ new JobConf(),
+ HiveIgnoreKeyTextOutputFormat.class,
+ new SerDeInfo(),
+ TableSchema.builder().build(),
+ new String[0],
+ new Properties(),
+
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()),
+ false
+ );
+
+ HiveCompactReaderFactory compactReaderFactory = new
HiveCompactReaderFactory(
+ new StorageDescriptor(),
+ new Properties(),
+ new JobConf(),
+ new
CatalogTableImpl(TableSchema.builder().build(), Collections.emptyMap(), null),
+ HiveShimLoader.getHiveVersion(),
+ RowType.of(DataTypes.INT().getLogicalType()),
+ false
+ );
+
+ HiveSource hiveSource = new HiveSource.HiveSourceBuilder(
+ new JobConf(),
+ new ObjectPath("default", "foo"),
+ new
CatalogTableImpl(TableSchema.builder().field("i", DataTypes.INT()).build(),
Collections.emptyMap(), null),
+ Collections.singletonList(new
HiveTablePartition(new StorageDescriptor(), new Properties())),
+ null,
+ HiveShimLoader.getHiveVersion(),
+ false,
+
RowType.of(DataTypes.INT().getLogicalType())).build();
+
+ return new Object[][]{
+ new Object[]{writerFactory,
writerFactory.getClass().getSimpleName()},
+ new Object[]{compactReaderFactory,
compactReaderFactory.getClass().getSimpleName()},
+ new Object[]{hiveSource,
hiveSource.getClass().getSimpleName()}
+ };
+ }
+
+ @Parameterized.Parameter
+ public Object object;
+
+ @Parameterized.Parameter(1)
+ public String name;
+
+ @Test
+ public void test() throws Exception {
+ ClassLoader parentLoader =
object.getClass().getClassLoader().getParent();
+ assumeTrue(parentLoader != null);
+ byte[] bytes = InstantiationUtil.serializeObject(object);
+ try {
+ InstantiationUtil.deserializeObject(bytes,
parentLoader);
+ fail("Exception not thrown");
+ } catch (ClassNotFoundException e) {
+ // expected
+ }
+ }
+}