JingsongLi commented on a change in pull request #12168: URL: https://github.com/apache/flink/pull/12168#discussion_r426233687
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveRecordWriterFactory.java ########## @@ -167,56 +151,65 @@ public HiveOutputFormat createOutputFormat(Path outPath) { } } - RecordWriter recordWriter = hiveShim.getHiveRecordWriter( + return hiveShim.getHiveRecordWriter( conf, hiveOutputFormatClz, recordSerDe.getSerializedClass(), isCompressed, tableProperties, - HadoopFileSystem.toHadoopPath(outPath)); - return new HiveOutputFormat(recordWriter); + path); } catch (Exception e) { throw new FlinkHiveException(e); } } - private class HiveOutputFormat implements org.apache.flink.api.common.io.OutputFormat<Row> { - - private final RecordWriter recordWriter; + public JobConf getJobConf() { + return confWrapper.conf(); + } - private HiveOutputFormat(RecordWriter recordWriter) { - this.recordWriter = recordWriter; - } + private void initialize() throws Exception { + JobConf jobConf = confWrapper.conf(); + Object serdeLib = Class.forName(serDeInfo.getSerializationLib()).newInstance(); + Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer, + "Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + + serdeLib.getClass().getName()); + this.recordSerDe = (Serializer) serdeLib; + ReflectionUtils.setConf(recordSerDe, jobConf); - // converts a Row to a list of Hive objects so that Hive can serialize it - private Object getConvertedRow(Row record) { - List<Object> res = new ArrayList<>(numNonPartitionColumns); - for (int i = 0; i < numNonPartitionColumns; i++) { - res.add(hiveConversions[i].toHiveObject(record.getField(i))); - } - return res; - } + // TODO: support partition properties, for now assume they're same as table properties + SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null); - @Override - public void configure(Configuration parameters) { + this.formatFields = allColumns.length - partitionColumns.length; + this.hiveConversions = new HiveObjectConversion[formatFields]; + this.converters = new DataFormatConverter[formatFields]; + List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length); + for (int i = 0; i < formatFields; i++) { + DataType type = allTypes[i]; + ObjectInspector objectInspector = HiveInspectors.getObjectInspector(type); + objectInspectors.add(objectInspector); + hiveConversions[i] = HiveInspectors.getConversion( + objectInspector, type.getLogicalType(), hiveShim); + converters[i] = DataFormatConverters.getConverterForDataType(type); } - @Override - public void open(int taskNumber, int numTasks) throws IOException { - } + this.formatInspector = ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList(allColumns).subList(0, formatFields), + objectInspectors); + } - @Override - public void writeRecord(Row record) throws IOException { - try { - recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector)); - } catch (SerDeException e) { - throw new IOException(e); - } + Writable toHiveWritable(Row row) throws SerDeException { Review comment: I will rename to `HiveWriterFactory` and add `createRowConverter` methods. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org