lirui-apache commented on a change in pull request #9864: [FLINK-14254][table] 
Introduce FileSystemOutputFormat for batch
URL: https://github.com/apache/flink/pull/9864#discussion_r344632446
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 ##########
 @@ -86,46 +83,36 @@ public HiveTableSink(JobConf jobConf, ObjectPath 
tablePath, CatalogTable table)
 
        @Override
        public OutputFormat<Row> getOutputFormat() {
-               List<String> partitionColumns = getPartitionFieldNames();
-               boolean isPartitioned = partitionColumns != null && 
!partitionColumns.isEmpty();
-               boolean isDynamicPartition = isPartitioned && 
partitionColumns.size() > staticPartitionSpec.size();
+               String[] partitionColumns = 
getPartitionFieldNames().toArray(new String[0]);
                String dbName = tablePath.getDatabaseName();
                String tableName = tablePath.getObjectName();
                try (HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), 
hiveVersion)) {
                        Table table = client.getTable(dbName, tableName);
                        StorageDescriptor sd = table.getSd();
-                       // here we use the sdLocation to store the output path 
of the job, which is always a staging dir
-                       String sdLocation = sd.getLocation();
-                       HiveTablePartition hiveTablePartition;
-                       if (isPartitioned) {
-                               validatePartitionSpec();
-                               if (isDynamicPartition) {
-                                       List<String> path = new ArrayList<>(2);
-                                       path.add(sd.getLocation());
-                                       if (!staticPartitionSpec.isEmpty()) {
-                                               
path.add(Warehouse.makePartName(staticPartitionSpec, false));
-                                       }
-                                       sdLocation = 
String.join(Path.SEPARATOR, path);
-                               } else {
-                                       List<Partition> partitions = 
client.listPartitions(dbName, tableName,
-                                                       new 
ArrayList<>(staticPartitionSpec.values()), (short) 1);
-                                       sdLocation = !partitions.isEmpty() ? 
partitions.get(0).getSd().getLocation() :
-                                                       sd.getLocation() + 
Path.SEPARATOR + Warehouse.makePartName(staticPartitionSpec, true);
-                               }
-
-                               sd.setLocation(toStagingDir(sdLocation, 
jobConf));
-                               hiveTablePartition = new HiveTablePartition(sd, 
new LinkedHashMap<>(staticPartitionSpec));
-                       } else {
-                               sd.setLocation(toStagingDir(sdLocation, 
jobConf));
-                               hiveTablePartition = new HiveTablePartition(sd, 
null);
-                       }
-                       return new HiveTableOutputFormat(
-                               jobConf,
-                               tablePath,
-                               catalogTable,
-                               hiveTablePartition,
-                               HiveReflectionUtils.getTableMetadata(hiveShim, 
table),
-                               overwrite);
+
+                       FileSystemOutputFormat.Builder<Row> builder = new 
FileSystemOutputFormat.Builder<>();
+                       builder.setColumnNames(tableSchema.getFieldNames());
+                       
builder.setDefaultPartName(jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                                       
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+                       builder.setDynamicGrouped(dynamicGrouping);
+                       builder.setPartitionColumns(partitionColumns);
+                       builder.setFileSystemFactory(new 
HiveFileSystemFactory(jobConf));
+                       builder.setFormatFactory(new HiveOutputFormatFactory(
+                                       jobConf,
+                                       sd.getOutputFormat(),
+                                       sd.getSerdeInfo(),
+                                       tableSchema,
+                                       partitionColumns,
+                                       
HiveReflectionUtils.getTableMetadata(hiveShim, table),
+                                       hiveVersion));
+                       builder.setMetaStoreFactory(
+                                       new HiveMetaStoreFactory(jobConf, 
hiveVersion, dbName, tableName));
+                       builder.setOverwrite(overwrite);
+                       builder.setStaticPartitions(staticPartitionSpec);
+                       builder.setTmpPath(new org.apache.flink.core.fs.Path(
 
 Review comment:
   Requiring the caller to specify a temp path seems strange to me. IMO caller 
of the API should only care about what the final path should be and not how 
temp paths are generated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to