JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233687



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveRecordWriterFactory.java
##########
@@ -167,56 +151,65 @@ public HiveOutputFormat createOutputFormat(Path outPath) {
                                }
                        }
 
-                       RecordWriter recordWriter = 
hiveShim.getHiveRecordWriter(
+                       return hiveShim.getHiveRecordWriter(
                                        conf,
                                        hiveOutputFormatClz,
                                        recordSerDe.getSerializedClass(),
                                        isCompressed,
                                        tableProperties,
-                                       HadoopFileSystem.toHadoopPath(outPath));
-                       return new HiveOutputFormat(recordWriter);
+                                       path);
                } catch (Exception e) {
                        throw new FlinkHiveException(e);
                }
        }
 
-       private class HiveOutputFormat implements 
org.apache.flink.api.common.io.OutputFormat<Row> {
-
-               private final RecordWriter recordWriter;
+       public JobConf getJobConf() {
+               return confWrapper.conf();
+       }
 
-               private HiveOutputFormat(RecordWriter recordWriter) {
-                       this.recordWriter = recordWriter;
-               }
+       private void initialize() throws Exception {
+               JobConf jobConf = confWrapper.conf();
+               Object serdeLib = 
Class.forName(serDeInfo.getSerializationLib()).newInstance();
+               Preconditions.checkArgument(serdeLib instanceof Serializer && 
serdeLib instanceof Deserializer,
+                               "Expect a SerDe lib implementing both 
Serializer and Deserializer, but actually got "
+                                               + 
serdeLib.getClass().getName());
+               this.recordSerDe = (Serializer) serdeLib;
+               ReflectionUtils.setConf(recordSerDe, jobConf);
 
-               // converts a Row to a list of Hive objects so that Hive can 
serialize it
-               private Object getConvertedRow(Row record) {
-                       List<Object> res = new 
ArrayList<>(numNonPartitionColumns);
-                       for (int i = 0; i < numNonPartitionColumns; i++) {
-                               
res.add(hiveConversions[i].toHiveObject(record.getField(i)));
-                       }
-                       return res;
-               }
+               // TODO: support partition properties, for now assume they're 
same as table properties
+               SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, 
tableProperties, null);
 
-               @Override
-               public void configure(Configuration parameters) {
+               this.formatFields = allColumns.length - partitionColumns.length;
+               this.hiveConversions = new HiveObjectConversion[formatFields];
+               this.converters = new DataFormatConverter[formatFields];
+               List<ObjectInspector> objectInspectors = new 
ArrayList<>(hiveConversions.length);
+               for (int i = 0; i < formatFields; i++) {
+                       DataType type = allTypes[i];
+                       ObjectInspector objectInspector = 
HiveInspectors.getObjectInspector(type);
+                       objectInspectors.add(objectInspector);
+                       hiveConversions[i] = HiveInspectors.getConversion(
+                                       objectInspector, type.getLogicalType(), 
hiveShim);
+                       converters[i] = 
DataFormatConverters.getConverterForDataType(type);
                }
 
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {
-               }
+               this.formatInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
+                               Arrays.asList(allColumns).subList(0, 
formatFields),
+                               objectInspectors);
+       }
 
-               @Override
-               public void writeRecord(Row record) throws IOException {
-                       try {
-                               
recordWriter.write(recordSerDe.serialize(getConvertedRow(record), 
rowObjectInspector));
-                       } catch (SerDeException e) {
-                               throw new IOException(e);
-                       }
+       Writable toHiveWritable(Row row) throws SerDeException {

Review comment:
       I will rename to `HiveWriterFactory` and add `createRowConverter` 
methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to