lirui-apache commented on a change in pull request #9864: [FLINK-14254][table] 
Introduce FileSystemOutputFormat for batch
URL: https://github.com/apache/flink/pull/9864#discussion_r350721200
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 ##########
 @@ -86,46 +83,36 @@ public HiveTableSink(JobConf jobConf, ObjectPath 
tablePath, CatalogTable table)
 
        @Override
        public OutputFormat<Row> getOutputFormat() {
-               List<String> partitionColumns = getPartitionFieldNames();
-               boolean isPartitioned = partitionColumns != null && 
!partitionColumns.isEmpty();
-               boolean isDynamicPartition = isPartitioned && 
partitionColumns.size() > staticPartitionSpec.size();
+               String[] partitionColumns = 
getPartitionFieldNames().toArray(new String[0]);
                String dbName = tablePath.getDatabaseName();
                String tableName = tablePath.getObjectName();
                try (HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), 
hiveVersion)) {
                        Table table = client.getTable(dbName, tableName);
                        StorageDescriptor sd = table.getSd();
-                       // here we use the sdLocation to store the output path 
of the job, which is always a staging dir
-                       String sdLocation = sd.getLocation();
-                       HiveTablePartition hiveTablePartition;
-                       if (isPartitioned) {
-                               validatePartitionSpec();
-                               if (isDynamicPartition) {
-                                       List<String> path = new ArrayList<>(2);
-                                       path.add(sd.getLocation());
-                                       if (!staticPartitionSpec.isEmpty()) {
-                                               
path.add(Warehouse.makePartName(staticPartitionSpec, false));
-                                       }
-                                       sdLocation = 
String.join(Path.SEPARATOR, path);
-                               } else {
-                                       List<Partition> partitions = 
client.listPartitions(dbName, tableName,
-                                                       new 
ArrayList<>(staticPartitionSpec.values()), (short) 1);
-                                       sdLocation = !partitions.isEmpty() ? 
partitions.get(0).getSd().getLocation() :
-                                                       sd.getLocation() + 
Path.SEPARATOR + Warehouse.makePartName(staticPartitionSpec, true);
-                               }
-
-                               sd.setLocation(toStagingDir(sdLocation, 
jobConf));
-                               hiveTablePartition = new HiveTablePartition(sd, 
new LinkedHashMap<>(staticPartitionSpec));
-                       } else {
-                               sd.setLocation(toStagingDir(sdLocation, 
jobConf));
-                               hiveTablePartition = new HiveTablePartition(sd, 
null);
-                       }
-                       return new HiveTableOutputFormat(
-                               jobConf,
-                               tablePath,
-                               catalogTable,
-                               hiveTablePartition,
-                               HiveReflectionUtils.getTableMetadata(hiveShim, 
table),
-                               overwrite);
+
+                       FileSystemOutputFormat.Builder<Row> builder = new 
FileSystemOutputFormat.Builder<>();
+                       builder.setColumnNames(tableSchema.getFieldNames());
+                       
builder.setDefaultPartName(jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                                       
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+                       builder.setDynamicGrouped(dynamicGrouping);
+                       builder.setPartitionColumns(partitionColumns);
+                       builder.setFileSystemFactory(new 
HiveFileSystemFactory(jobConf));
+                       builder.setFormatFactory(new HiveOutputFormatFactory(
+                                       jobConf,
+                                       sd.getOutputFormat(),
+                                       sd.getSerdeInfo(),
+                                       tableSchema,
+                                       partitionColumns,
+                                       
HiveReflectionUtils.getTableMetadata(hiveShim, table),
+                                       hiveVersion));
+                       builder.setMetaStoreFactory(
+                                       new HiveMetaStoreFactory(jobConf, 
hiveVersion, dbName, tableName));
+                       builder.setOverwrite(overwrite);
+                       builder.setStaticPartitions(staticPartitionSpec);
+                       builder.setTmpPath(new org.apache.flink.core.fs.Path(
 
 Review comment:
   If temp path has to come from table location, e.g. as a sub-dir of table 
location, then the builder should ask for the table location and generate the 
temp path it needs. Otherwise, we should clearly define what kind of path we 
need in the builder contract. It's not good practice to define an API that 
takes an arbitrary path and implicitly rely on callers to pass something of a 
specific structure.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to