This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 5983867  [FLINK-20241][hive] Improve exception message when hive deps 
are missing on JM/TM
5983867 is described below

commit 598386763115cb9fa3de29a5e70ec9fd0e0faf57
Author: Rui Li <[email protected]>
AuthorDate: Fri Nov 27 10:42:43 2020 +0800

    [FLINK-20241][hive] Improve exception message when hive deps are missing on 
JM/TM
    
    This closes #14203
---
 .../connectors/hive/CachedSerializedValue.java     |  49 +++++
 .../flink/connectors/hive/HiveTablePartition.java  |  17 +-
 .../flink/connectors/hive/JobConfWrapper.java      |  27 ++-
 .../hive/read/HiveCompactReaderFactory.java        |  16 +-
 .../connectors/hive/read/HiveTableInputFormat.java |  98 ++-------
 .../hive/read/TimestampedHiveInputSplit.java       | 237 ---------------------
 .../connectors/hive/write/HiveWriterFactory.java   |  12 +-
 .../hive/HiveDeserializeExceptionTest.java         | 110 ++++++++++
 8 files changed, 233 insertions(+), 333 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/CachedSerializedValue.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/CachedSerializedValue.java
new file mode 100644
index 0000000..41d70d8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/CachedSerializedValue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+
+/**
+ * An extension of SerializedValue which caches the deserialized data.
+ */
+public class CachedSerializedValue<T> extends SerializedValue<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient T deserialized;
+
+       public CachedSerializedValue(T value) throws IOException {
+               super(value);
+       }
+
+       @Override
+       public T deserializeValue(ClassLoader loader) throws IOException, 
ClassNotFoundException {
+               if (deserialized == null) {
+                       deserialized = super.deserializeValue(loader);
+               }
+               return deserialized;
+       }
+
+       public T deserializeValue() throws IOException, ClassNotFoundException {
+               return 
deserializeValue(Thread.currentThread().getContextClassLoader());
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
index b670662..e5619b6 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -37,7 +38,7 @@ public class HiveTablePartition implements Serializable {
        private static final long serialVersionUID = 4145470177119940673L;
 
        /** Partition storage descriptor. */
-       private final StorageDescriptor storageDescriptor;
+       private final CachedSerializedValue<StorageDescriptor> 
storageDescriptor;
 
        /** The map of partition key names and their values. */
        private final Map<String, Object> partitionSpec;
@@ -50,13 +51,21 @@ public class HiveTablePartition implements Serializable {
        }
 
        public HiveTablePartition(StorageDescriptor storageDescriptor, 
Map<String, Object> partitionSpec, Properties tableProps) {
-               this.storageDescriptor = checkNotNull(storageDescriptor, 
"storageDescriptor can not be null");
+               try {
+                       this.storageDescriptor = new 
CachedSerializedValue<>(checkNotNull(storageDescriptor, "storageDescriptor can 
not be null"));
+               } catch (IOException e) {
+                       throw new FlinkHiveException("Failed to serialize 
StorageDescriptor", e);
+               }
                this.partitionSpec = checkNotNull(partitionSpec, "partitionSpec 
can not be null");
                this.tableProps = checkNotNull(tableProps, "tableProps can not 
be null");
        }
 
        public StorageDescriptor getStorageDescriptor() {
-               return storageDescriptor;
+               try {
+                       return storageDescriptor.deserializeValue();
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new FlinkHiveException("Failed to deserialize 
StorageDescriptor", e);
+               }
        }
 
        public Map<String, Object> getPartitionSpec() {
@@ -89,7 +98,7 @@ public class HiveTablePartition implements Serializable {
        @Override
        public String toString() {
                return "HiveTablePartition{" +
-                               "storageDescriptor=" + storageDescriptor +
+                               "storageDescriptor=" + getStorageDescriptor() +
                                ", partitionSpec=" + partitionSpec +
                                ", tableProps=" + tableProps +
                                '}';
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
index 505c555..13c1693 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/JobConfWrapper.java
@@ -18,6 +18,8 @@
 package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.Credentials;
@@ -35,7 +37,7 @@ public class JobConfWrapper implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
-       private JobConf jobConf;
+       private transient JobConf jobConf;
 
        public JobConfWrapper(JobConf jobConf) {
                this.jobConf = jobConf;
@@ -46,14 +48,29 @@ public class JobConfWrapper implements Serializable {
        }
 
        private void writeObject(ObjectOutputStream out) throws IOException {
-               jobConf.write(out);
+               out.defaultWriteObject();
+
+               // we write the jobConf through a separate serializer to avoid 
cryptic exceptions when it
+               // corrupts the serialization stream
+               final DataOutputSerializer ser = new DataOutputSerializer(256);
+               jobConf.write(ser);
+               out.writeInt(ser.length());
+               out.write(ser.getSharedBuffer(), 0, ser.length());
        }
 
        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               if (jobConf == null) {
-                       jobConf = new JobConf();
+               in.defaultReadObject();
+
+               final byte[] data = new byte[in.readInt()];
+               in.readFully(data);
+               final DataInputDeserializer deser = new 
DataInputDeserializer(data);
+               this.jobConf = new JobConf();
+               try {
+                       jobConf.readFields(deser);
+               } catch (IOException e) {
+                       throw new IOException(
+                                       "Could not deserialize JobConf, the 
serialized and de-serialized don't match.", e);
                }
-               jobConf.readFields(in);
                Credentials currentUserCreds = 
HadoopInputFormatCommonBase.getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
                if (currentUserCreds != null) {
                        jobConf.getCredentials().addAll(currentUserCreds);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
index 7cf3d8a..eeb13fc 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
@@ -19,6 +19,8 @@
 package org.apache.flink.connectors.hive.read;
 
 import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connectors.hive.CachedSerializedValue;
+import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.core.fs.FileSystem;
@@ -53,7 +55,7 @@ public class HiveCompactReaderFactory implements 
CompactReader.Factory<RowData>
 
        private static final long serialVersionUID = 1L;
 
-       private final StorageDescriptor sd;
+       private final CachedSerializedValue<StorageDescriptor> sd;
        private final Properties properties;
        private final JobConfWrapper jobConfWrapper;
        private final List<String> partitionKeys;
@@ -72,7 +74,11 @@ public class HiveCompactReaderFactory implements 
CompactReader.Factory<RowData>
                        String hiveVersion,
                        RowType producedRowType,
                        boolean useMapRedReader) {
-               this.sd = sd;
+               try {
+                       this.sd = new CachedSerializedValue<>(sd);
+               } catch (IOException e) {
+                       throw new FlinkHiveException("Failed to serialize 
StorageDescriptor", e);
+               }
                this.properties = properties;
                this.jobConfWrapper = new JobConfWrapper(jobConf);
                this.partitionKeys = catalogTable.getPartitionKeys();
@@ -110,6 +116,10 @@ public class HiveCompactReaderFactory implements 
CompactReader.Factory<RowData>
                        partitionSpec.put(entry.getKey(), partitionValue);
                }
 
-               return new HiveTablePartition(sd, partitionSpec, properties);
+               try {
+                       return new HiveTablePartition(sd.deserializeValue(), 
partitionSpec, properties);
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new FlinkHiveException("Failed to deserialize 
StorageDescriptor", e);
+               }
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index d28ae69..ea6249e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -25,8 +25,8 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
 import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.data.GenericRowData;
@@ -41,15 +41,11 @@ import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -74,26 +70,26 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
        private static final String SCHEMA_EVOLUTION_COLUMNS = 
"schema.evolution.columns";
        private static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = 
"schema.evolution.columns.types";
 
-       private JobConf jobConf;
+       private final JobConfWrapper jobConf;
 
-       private String hiveVersion;
+       private final String hiveVersion;
 
-       private List<String> partitionKeys;
+       private final List<String> partitionKeys;
 
-       private DataType[] fieldTypes;
+       private final DataType[] fieldTypes;
 
-       private String[] fieldNames;
+       private final String[] fieldNames;
 
        //For non-partition hive table, partitions only contains one partition 
which partitionValues is empty.
-       private List<HiveTablePartition> partitions;
+       private final List<HiveTablePartition> partitions;
 
        // indices of fields to be returned, with projection applied (if any)
-       private int[] selectedFields;
+       private final int[] selectedFields;
 
        //We should limit the input read count of this splits, null represents 
no limit.
-       private Long limit;
+       private final Long limit;
 
-       private boolean useMapRedReader;
+       private final boolean useMapRedReader;
 
        private transient long currentReadCount = 0L;
 
@@ -102,27 +98,6 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
 
        public HiveTableInputFormat(
                        JobConf jobConf,
-                       CatalogTable catalogTable,
-                       List<HiveTablePartition> partitions,
-                       int[] projectedFields,
-                       Long limit,
-                       String hiveVersion,
-                       boolean useMapRedReader) {
-               this(
-                               jobConf,
-                               checkNotNull(catalogTable, "catalogTable can 
not be null.")
-                                               .getPartitionKeys(),
-                               catalogTable.getSchema().getFieldDataTypes(),
-                               catalogTable.getSchema().getFieldNames(),
-                               projectedFields,
-                               limit,
-                               hiveVersion,
-                               useMapRedReader,
-                               partitions);
-       }
-
-       public HiveTableInputFormat(
-                       JobConf jobConf,
                        List<String> partitionKeys,
                        DataType[] fieldTypes,
                        String[] fieldNames,
@@ -132,7 +107,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
                        boolean useMapRedReader,
                        List<HiveTablePartition> partitions) {
                super(jobConf.getCredentials());
-               this.jobConf = new JobConf(jobConf);
+               this.jobConf = new JobConfWrapper(new JobConf(jobConf));
                this.partitionKeys = partitionKeys;
                this.fieldTypes = fieldTypes;
                this.fieldNames = fieldNames;
@@ -145,7 +120,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
        }
 
        public JobConf getJobConf() {
-               return jobConf;
+               return jobConf.conf();
        }
 
        @Override
@@ -157,12 +132,12 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
                HiveTablePartition partition = split.getHiveTablePartition();
                if (!useMapRedReader && useOrcVectorizedRead(partition)) {
                        this.reader = new HiveVectorizedOrcSplitReader(
-                                       hiveVersion, jobConf, fieldNames, 
fieldTypes, selectedFields, split);
+                                       hiveVersion, jobConf.conf(), 
fieldNames, fieldTypes, selectedFields, split);
                } else if (!useMapRedReader && 
useParquetVectorizedRead(partition)) {
                        this.reader = new HiveVectorizedParquetSplitReader(
-                                       hiveVersion, jobConf, fieldNames, 
fieldTypes, selectedFields, split);
+                                       hiveVersion, jobConf.conf(), 
fieldNames, fieldTypes, selectedFields, split);
                } else {
-                       JobConf clonedConf = new JobConf(jobConf);
+                       JobConf clonedConf = new JobConf(jobConf.conf());
                        addSchemaToConf(clonedConf);
                        this.reader = new HiveMapredSplitReader(clonedConf, 
partitionKeys, fieldTypes, selectedFields, split,
                                        
HiveShimLoader.loadHiveShim(hiveVersion));
@@ -301,7 +276,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
        @Override
        public HiveTableInputSplit[] createInputSplits(int minNumSplits)
                        throws IOException {
-               return createInputSplits(minNumSplits, partitions, jobConf);
+               return createInputSplits(minNumSplits, partitions, 
jobConf.conf());
        }
 
        public static HiveTableInputSplit[] createInputSplits(
@@ -358,7 +333,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
                        StorageDescriptor sd = partition.getStorageDescriptor();
                        Path inputPath = new Path(sd.getLocation());
                        if (fs == null) {
-                               fs = inputPath.getFileSystem(jobConf);
+                               fs = inputPath.getFileSystem(jobConf.conf());
                        }
                        // it's possible a partition exists in metastore but 
the data has been removed
                        if (!fs.exists(inputPath)) {
@@ -368,43 +343,4 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<RowData, H
                }
                return numFiles;
        }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom serialization methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               super.write(out);
-               jobConf.write(out);
-               out.writeObject(partitionKeys);
-               out.writeObject(fieldTypes);
-               out.writeObject(fieldNames);
-               out.writeObject(partitions);
-               out.writeObject(selectedFields);
-               out.writeObject(limit);
-               out.writeObject(hiveVersion);
-               out.writeBoolean(useMapRedReader);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               super.read(in);
-               if (jobConf == null) {
-                       jobConf = new JobConf();
-               }
-               jobConf.readFields(in);
-               jobConf.getCredentials().addAll(this.credentials);
-               Credentials currentUserCreds = 
getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-               if (currentUserCreds != null) {
-                       jobConf.getCredentials().addAll(currentUserCreds);
-               }
-               partitionKeys = (List<String>) in.readObject();
-               fieldTypes = (DataType[]) in.readObject();
-               fieldNames = (String[]) in.readObject();
-               partitions = (List<HiveTablePartition>) in.readObject();
-               selectedFields = (int[]) in.readObject();
-               limit = (Long) in.readObject();
-               hiveVersion = (String) in.readObject();
-               useMapRedReader = in.readBoolean();
-       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
deleted file mode 100644
index f3e46af..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.hive.read;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * A {@link HiveTableInputSplit} with {@link TimestampedInputSplit}.
- * Kryo serializer can not deal with hadoop split, need specific type 
information factory.
- *
- * <p>Note: this class has a natural ordering that is inconsistent with equals.
- */
-@TypeInfo(TimestampedHiveInputSplit.SplitTypeInfoFactory.class)
-public class TimestampedHiveInputSplit extends HiveTableInputSplit implements 
TimestampedInputSplit {
-
-       private static final long serialVersionUID = 1L;
-
-       /** The modification time of the file this split belongs to. */
-       private final long modificationTime;
-
-       /**
-        * The state of the split. This information is used when
-        * restoring from a checkpoint and allows to resume reading the
-        * underlying file from the point we left off.
-        * */
-       private Serializable splitState;
-
-       public TimestampedHiveInputSplit(
-                       long modificationTime,
-                       HiveTableInputSplit split) {
-               super(
-                               split.getSplitNumber(),
-                               split.getHadoopInputSplit(),
-                               split.getJobConf(),
-                               split.getHiveTablePartition());
-               this.modificationTime = modificationTime;
-       }
-
-       @Override
-       public void setSplitState(Serializable state) {
-               this.splitState = state;
-       }
-
-       @Override
-       public Serializable getSplitState() {
-               return this.splitState;
-       }
-
-       @Override
-       public long getModificationTime() {
-               return modificationTime;
-       }
-
-       /**
-        * Note Again: this class has a natural ordering that is inconsistent 
with equals.
-        */
-       @Override
-       public int compareTo(TimestampedInputSplit o) {
-               TimestampedHiveInputSplit split = (TimestampedHiveInputSplit) o;
-               int modTimeComp = Long.compare(this.modificationTime, 
split.modificationTime);
-               if (modTimeComp != 0L) {
-                       return modTimeComp;
-               }
-
-               int sdComp = 
this.hiveTablePartition.getStorageDescriptor().compareTo(
-                               
split.hiveTablePartition.getStorageDescriptor());
-
-               return sdComp != 0 ? sdComp :
-                               this.getSplitNumber() - o.getSplitNumber();
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               if (!super.equals(o)) {
-                       return false;
-               }
-               TimestampedHiveInputSplit that = (TimestampedHiveInputSplit) o;
-               return modificationTime == that.modificationTime;
-       }
-
-       @Override
-       public int hashCode() {
-               return Objects.hash(super.hashCode(), modificationTime);
-       }
-
-       @Override
-       public String toString() {
-               return "TimestampedHiveInputSplit{" +
-                               "modificationTime=" + modificationTime +
-                               ", splitState=" + splitState +
-                               ", hiveTablePartition=" + hiveTablePartition +
-                               '}';
-       }
-
-       /**
-        * {@link TypeInfoFactory} for {@link TimestampedHiveInputSplit}.
-        */
-       public static class SplitTypeInfoFactory
-                       extends TypeInfoFactory<TimestampedHiveInputSplit>
-                       implements Serializable {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public TypeInformation<TimestampedHiveInputSplit> 
createTypeInfo(
-                               Type t, Map genericParameters) {
-                       return new BasicTypeInfo<TimestampedHiveInputSplit>(
-                                       TimestampedHiveInputSplit.class,
-                                       new Class<?>[]{},
-                                       SplitTypeSerializer.INSTANCE,
-                                       null) {};
-               }
-       }
-
-       private static class SplitTypeSerializer extends 
TypeSerializerSingleton<TimestampedHiveInputSplit> {
-
-               private static final SplitTypeSerializer INSTANCE = new 
SplitTypeSerializer();
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public boolean isImmutableType() {
-                       return false;
-               }
-
-               @Override
-               public TimestampedHiveInputSplit createInstance() {
-                       return null;
-               }
-
-               @Override
-               public TimestampedHiveInputSplit copy(TimestampedHiveInputSplit 
from) {
-                       try {
-                               return InstantiationUtil.clone(from, 
Thread.currentThread().getContextClassLoader());
-                       } catch (IOException | ClassNotFoundException e) {
-                               throw new FlinkRuntimeException("Could not copy 
element via serialization: " + from, e);
-                       }
-               }
-
-               @Override
-               public TimestampedHiveInputSplit copy(TimestampedHiveInputSplit 
from, TimestampedHiveInputSplit reuse) {
-                       return copy(from);
-               }
-
-               @Override
-               public int getLength() {
-                       return -1;
-               }
-
-               @Override
-               public void serialize(TimestampedHiveInputSplit record, 
DataOutputView target) throws IOException {
-                       try (final DataOutputViewStream outViewWrapper = new 
DataOutputViewStream(target)) {
-                               
InstantiationUtil.serializeObject(outViewWrapper, record);
-                       }
-               }
-
-               @Override
-               public TimestampedHiveInputSplit deserialize(DataInputView 
source) throws IOException {
-                       try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(source)) {
-                               return InstantiationUtil.deserializeObject(
-                                               inViewWrapper,
-                                               
Thread.currentThread().getContextClassLoader());
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException("Could not deserialize 
object.", e);
-                       }
-               }
-
-               @Override
-               public TimestampedHiveInputSplit deserialize(
-                               TimestampedHiveInputSplit reuse, DataInputView 
source) throws IOException {
-                       return deserialize(source);
-               }
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-                       TimestampedHiveInputSplit tmp = deserialize(source);
-                       serialize(tmp, target);
-               }
-
-               @Override
-               public TypeSerializerSnapshot<TimestampedHiveInputSplit> 
snapshotConfiguration() {
-                       return new 
SplitTypeSerializer.SplitSerializerSnapshot();
-               }
-
-               /**
-                * Serializer configuration snapshot for compatibility and 
format evolution.
-                */
-               @SuppressWarnings("WeakerAccess")
-               public static final class SplitSerializerSnapshot extends
-                               
SimpleTypeSerializerSnapshot<TimestampedHiveInputSplit> {
-
-                       public SplitSerializerSnapshot() {
-                               super(SplitTypeSerializer::new);
-                       }
-               }
-       }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
index b0808ad..563b342 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveWriterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connectors.hive.write;
 
+import org.apache.flink.connectors.hive.CachedSerializedValue;
 import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.table.api.TableSchema;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -68,7 +70,7 @@ public class HiveWriterFactory implements Serializable {
 
        private final Class hiveOutputFormatClz;
 
-       private final SerDeInfo serDeInfo;
+       private final CachedSerializedValue<SerDeInfo> serDeInfo;
 
        private final String[] allColumns;
 
@@ -115,7 +117,11 @@ public class HiveWriterFactory implements Serializable {
                                "The output format should be an instance of 
HiveOutputFormat");
                this.confWrapper = new JobConfWrapper(jobConf);
                this.hiveOutputFormatClz = hiveOutputFormatClz;
-               this.serDeInfo = serDeInfo;
+               try {
+                       this.serDeInfo = new CachedSerializedValue<>(serDeInfo);
+               } catch (IOException e) {
+                       throw new FlinkHiveException("Failed to serialize 
SerDeInfo", e);
+               }
                this.allColumns = schema.getFieldNames();
                this.allTypes = schema.getFieldDataTypes();
                this.partitionColumns = partitionColumns;
@@ -170,7 +176,7 @@ public class HiveWriterFactory implements Serializable {
                }
 
                JobConf jobConf = confWrapper.conf();
-               Object serdeLib = 
Class.forName(serDeInfo.getSerializationLib()).newInstance();
+               Object serdeLib = 
Class.forName(serDeInfo.deserializeValue().getSerializationLib()).newInstance();
                Preconditions.checkArgument(serdeLib instanceof Serializer && 
serdeLib instanceof Deserializer,
                                "Expect a SerDe lib implementing both 
Serializer and Deserializer, but actually got "
                                                + 
serdeLib.getClass().getName());
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDeserializeExceptionTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDeserializeExceptionTest.java
new file mode 100644
index 0000000..c569f44
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDeserializeExceptionTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
+import org.apache.flink.connectors.hive.write.HiveWriterFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Sometimes users only add hive connector deps on client side but forget to 
add them on JM/TM.
+ * This test is to make sure users get a clear message when that happens.
+ */
+@RunWith(Parameterized.class)
+public class HiveDeserializeExceptionTest {
+
+       @Parameterized.Parameters(name = "{1}")
+       public static Object[] parameters() {
+               HiveWriterFactory writerFactory = new HiveWriterFactory(
+                               new JobConf(),
+                               HiveIgnoreKeyTextOutputFormat.class,
+                               new SerDeInfo(),
+                               TableSchema.builder().build(),
+                               new String[0],
+                               new Properties(),
+                               
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()),
+                               false
+               );
+
+               HiveCompactReaderFactory compactReaderFactory = new 
HiveCompactReaderFactory(
+                               new StorageDescriptor(),
+                               new Properties(),
+                               new JobConf(),
+                               new 
CatalogTableImpl(TableSchema.builder().build(), Collections.emptyMap(), null),
+                               HiveShimLoader.getHiveVersion(),
+                               RowType.of(DataTypes.INT().getLogicalType()),
+                               false
+               );
+
+               HiveSource hiveSource = new HiveSource.HiveSourceBuilder(
+                               new JobConf(),
+                               new ObjectPath("default", "foo"),
+                               new 
CatalogTableImpl(TableSchema.builder().field("i", DataTypes.INT()).build(), 
Collections.emptyMap(), null),
+                               Collections.singletonList(new 
HiveTablePartition(new StorageDescriptor(), new Properties())),
+                               null,
+                               HiveShimLoader.getHiveVersion(),
+                               false,
+                               
RowType.of(DataTypes.INT().getLogicalType())).build();
+
+               return new Object[][]{
+                               new Object[]{writerFactory, 
writerFactory.getClass().getSimpleName()},
+                               new Object[]{compactReaderFactory, 
compactReaderFactory.getClass().getSimpleName()},
+                               new Object[]{hiveSource, 
hiveSource.getClass().getSimpleName()}
+               };
+       }
+
+       @Parameterized.Parameter
+       public Object object;
+
+       @Parameterized.Parameter(1)
+       public String name;
+
+       @Test
+       public void test() throws Exception {
+               ClassLoader parentLoader = 
object.getClass().getClassLoader().getParent();
+               assumeTrue(parentLoader != null);
+               byte[] bytes = InstantiationUtil.serializeObject(object);
+               try {
+                       InstantiationUtil.deserializeObject(bytes, 
parentLoader);
+                       fail("Exception not thrown");
+               } catch (ClassNotFoundException e) {
+                       // expected
+               }
+       }
+}

Reply via email to