luoyuxia commented on code in PR #19520:
URL: https://github.com/apache/flink/pull/19520#discussion_r933716216
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java:
##########
@@ -285,6 +304,78 @@ public Operation
createInsertOperation(HiveParserCalcitePlanner analyzer, RelNod
Collections.emptyMap());
}
+ private SinkModifyOperation createInsertIntoDirectoryOperation(
+ HiveParserQB topQB, QBMetaData qbMetaData, RelNode queryRelNode) {
+ String dest =
topQB.getParseInfo().getClauseNamesForDest().iterator().next();
+ // get the location for insert into directory
+ String location = qbMetaData.getDestFileForAlias(dest);
+ // get whether it's for insert local directory
+ boolean isToLocal = qbMetaData.getDestTypeForAlias(dest) ==
QBMetaData.DEST_LOCAL_FILE;
+ HiveParserDirectoryDesc directoryDesc = topQB.getDirectoryDesc();
+
+ // set row format / stored as / location
+ Map<String, String> props = new HashMap<>();
+
HiveParserDDLSemanticAnalyzer.encodeRowFormat(directoryDesc.getRowFormatParams(),
props);
+
HiveParserDDLSemanticAnalyzer.encodeStorageFormat(directoryDesc.getStorageFormat(),
props);
+ props.put(TABLE_LOCATION_URI, location);
+
+ props.put(FactoryUtil.CONNECTOR.key(),
HiveCatalogFactoryOptions.IDENTIFIER);
+ // mark it's for insert into directory
+ props.put(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX +
IS_INSERT_DIRECTORY, "true");
+ // mark it's for insert into local directory or not
+ props.put(
+ CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX +
IS_TO_LOCAL_DIRECTORY,
+ String.valueOf(isToLocal));
+
+ List<RelDataTypeField> fieldList =
queryRelNode.getRowType().getFieldList();
+ String[] colNameArr = new String[fieldList.size()];
+ String[] colTypeArr = new String[fieldList.size()];
+ for (int i = 0; i < fieldList.size(); i++) {
+ colNameArr[i] = fieldList.get(i).getName();
+ TypeInfo typeInfo =
HiveParserTypeConverter.convert(fieldList.get(i).getType());
+ if (typeInfo.equals(TypeInfoFactory.voidTypeInfo)) {
+ colTypeArr[i] = TypeInfoFactory.stringTypeInfo.getTypeName();
+ } else {
+ colTypeArr[i] = typeInfo.getTypeName();
+ }
+ }
+
+ String colNames = String.join(",", colNameArr);
+ String colTypes = String.join(":", colTypeArr);
+ props.put("columns", colNames);
+ props.put("columns.types", colTypes);
+
+ PlannerQueryOperation plannerQueryOperation = new
PlannerQueryOperation(queryRelNode);
Review Comment:
We don't need the extra logic when it's for `insert overwrite into
directory`.
When it's insert into directory, no need to do type conversion which is
actually for converting into dest table's schema and no need to do adjustment
for static partitions & dynamic partition which is only needed when the dest is
a real table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]