lirui-apache commented on a change in pull request #9864: [FLINK-14254][table]
Introduce FileSystemOutputFormat for batch
URL: https://github.com/apache/flink/pull/9864#discussion_r350721200
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
##########
@@ -86,46 +83,36 @@ public HiveTableSink(JobConf jobConf, ObjectPath
tablePath, CatalogTable table)
@Override
public OutputFormat<Row> getOutputFormat() {
- List<String> partitionColumns = getPartitionFieldNames();
- boolean isPartitioned = partitionColumns != null &&
!partitionColumns.isEmpty();
- boolean isDynamicPartition = isPartitioned &&
partitionColumns.size() > staticPartitionSpec.size();
+ String[] partitionColumns =
getPartitionFieldNames().toArray(new String[0]);
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class),
hiveVersion)) {
Table table = client.getTable(dbName, tableName);
StorageDescriptor sd = table.getSd();
- // here we use the sdLocation to store the output path
of the job, which is always a staging dir
- String sdLocation = sd.getLocation();
- HiveTablePartition hiveTablePartition;
- if (isPartitioned) {
- validatePartitionSpec();
- if (isDynamicPartition) {
- List<String> path = new ArrayList<>(2);
- path.add(sd.getLocation());
- if (!staticPartitionSpec.isEmpty()) {
-
path.add(Warehouse.makePartName(staticPartitionSpec, false));
- }
- sdLocation =
String.join(Path.SEPARATOR, path);
- } else {
- List<Partition> partitions =
client.listPartitions(dbName, tableName,
- new
ArrayList<>(staticPartitionSpec.values()), (short) 1);
- sdLocation = !partitions.isEmpty() ?
partitions.get(0).getSd().getLocation() :
- sd.getLocation() +
Path.SEPARATOR + Warehouse.makePartName(staticPartitionSpec, true);
- }
-
- sd.setLocation(toStagingDir(sdLocation,
jobConf));
- hiveTablePartition = new HiveTablePartition(sd,
new LinkedHashMap<>(staticPartitionSpec));
- } else {
- sd.setLocation(toStagingDir(sdLocation,
jobConf));
- hiveTablePartition = new HiveTablePartition(sd,
null);
- }
- return new HiveTableOutputFormat(
- jobConf,
- tablePath,
- catalogTable,
- hiveTablePartition,
- HiveReflectionUtils.getTableMetadata(hiveShim,
table),
- overwrite);
+
+ FileSystemOutputFormat.Builder<Row> builder = new
FileSystemOutputFormat.Builder<>();
+ builder.setColumnNames(tableSchema.getFieldNames());
+
builder.setDefaultPartName(jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+ builder.setDynamicGrouped(dynamicGrouping);
+ builder.setPartitionColumns(partitionColumns);
+ builder.setFileSystemFactory(new
HiveFileSystemFactory(jobConf));
+ builder.setFormatFactory(new HiveOutputFormatFactory(
+ jobConf,
+ sd.getOutputFormat(),
+ sd.getSerdeInfo(),
+ tableSchema,
+ partitionColumns,
+
HiveReflectionUtils.getTableMetadata(hiveShim, table),
+ hiveVersion));
+ builder.setMetaStoreFactory(
+ new HiveMetaStoreFactory(jobConf,
hiveVersion, dbName, tableName));
+ builder.setOverwrite(overwrite);
+ builder.setStaticPartitions(staticPartitionSpec);
+ builder.setTmpPath(new org.apache.flink.core.fs.Path(
Review comment:
If temp path has to come from table location, e.g. as a sub-dir of table
location, then the builder should ask for the table location and generate the
temp path it needs. Otherwise, we should clearly define what kind of path we
need in the builder contract. It's not good practice to define an API that
takes an arbitrary path and implicitly rely on callers to pass something of a
specific structure.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services