JingsongLi commented on a change in pull request #12108:
URL: https://github.com/apache/flink/pull/12108#discussion_r426109314
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
##########
@@ -1450,4 +1489,253 @@ static boolean isGenericForGet(Map<String, String>
properties) {
return properties != null &&
Boolean.parseBoolean(properties.getOrDefault(CatalogConfig.IS_GENERIC,
"false"));
}
+ public static void disallowChangeIsGeneric(boolean oldIsGeneric,
boolean newIsGeneric) {
+ checkArgument(oldIsGeneric == newIsGeneric, "Changing whether a
metadata object is generic is not allowed");
+ }
+
+ private static AlterTableOp extractAlterTableOp(Map<String, String>
props) {
+ String opStr = props.remove(ALTER_TABLE_OP);
+ if (opStr != null) {
+ return AlterTableOp.valueOf(opStr);
+ }
+ return null;
+ }
+
+ private Table alterTableViaCatalogBaseTable(ObjectPath tablePath,
CatalogBaseTable baseTable, Table oldHiveTable) {
+ Table newHiveTable = instantiateHiveTable(tablePath, baseTable,
hiveConf);
+ // client.alter_table() requires a valid location
+ // thus, if new table doesn't have that, it reuses location of
the old table
+ if (!newHiveTable.getSd().isSetLocation()) {
+
newHiveTable.getSd().setLocation(oldHiveTable.getSd().getLocation());
+ }
+ return newHiveTable;
+ }
+
+ private void alterTableViaProperties(AlterTableOp alterOp, Table
hiveTable, Map<String, String> oldProps,
+ Map<String, String> newProps, StorageDescriptor sd) {
+ switch (alterOp) {
+ case CHANGE_TBL_PROPS:
+ oldProps.putAll(newProps);
+ break;
+ case CHANGE_LOCATION:
+ HiveTableUtil.extractLocation(sd, newProps);
+ break;
+ case CHANGE_FILE_FORMAT:
+ String newFileFormat =
newProps.remove(STORED_AS_FILE_FORMAT);
+ HiveTableUtil.setStorageFormat(sd,
newFileFormat, hiveConf);
+ break;
+ case CHANGE_SERDE_PROPS:
+ HiveTableUtil.extractRowFormat(sd, newProps);
+ break;
+ case ADD_COLUMNS:
+ case REPLACE_COLUMNS:
+ boolean cascade =
Boolean.parseBoolean(newProps.remove(ALTER_COL_CASCADE));
+ ensureCascadeOnPartitionedTable(hiveTable,
cascade);
+ String[] names =
newProps.remove(ADD_REPLACE_COL_NAMES).split(HiveDDLUtils.COL_DELIMITER);
+ String[] types =
newProps.remove(ADD_REPLACE_COL_TYPES).split(HiveDDLUtils.COL_DELIMITER);
+ checkArgument(names.length == types.length);
+ List<FieldSchema> columns = new
ArrayList<>(names.length);
+ for (int i = 0; i < names.length; i++) {
+ String comment =
newProps.remove(String.format(ADD_REPLACE_COL_COMMENT_FORMAT, i));
+ columns.add(new FieldSchema(names[i],
calTypeStrToHiveTypeInfo(types[i].toLowerCase()).getTypeName(), comment));
+ }
+ addReplaceColumns(sd, columns, alterOp ==
REPLACE_COLUMNS);
+ if (cascade) {
+ try {
+ for (CatalogPartitionSpec spec
: listPartitions(new ObjectPath(hiveTable.getDbName(),
hiveTable.getTableName()))) {
+ Partition partition =
getHivePartition(hiveTable, spec);
+
addReplaceColumns(partition.getSd(), columns, alterOp == REPLACE_COLUMNS);
+
client.alter_partition(hiveTable.getDbName(), hiveTable.getTableName(),
partition);
+ }
+ } catch (Exception e) {
+ throw new
CatalogException("Failed to cascade add/replace columns to partitions", e);
+ }
+ }
+ break;
+ case CHANGE_COLUMN:
+ cascade =
Boolean.parseBoolean(newProps.remove(ALTER_COL_CASCADE));
+ ensureCascadeOnPartitionedTable(hiveTable,
cascade);
+ String oldName =
newProps.remove(CHANGE_COL_OLD_NAME).toLowerCase();
+ String newName =
newProps.remove(CHANGE_COL_NEW_NAME).toLowerCase();
+ String newType =
newProps.remove(CHANGE_COL_NEW_TYPE).toLowerCase();
+ String colComment =
newProps.remove(CHANGE_COL_COMMENT);
+ boolean first =
Boolean.parseBoolean(newProps.remove(CHANGE_COL_FIRST));
+ String after =
newProps.remove(CHANGE_COL_AFTER);
+ if (after != null) {
+ after = after.toLowerCase();
+ }
+ changeColumn(oldName, newName, newType,
colComment, first, after, sd);
+ if (cascade) {
+ try {
+ for (CatalogPartitionSpec spec
: listPartitions(new ObjectPath(hiveTable.getDbName(),
hiveTable.getTableName()))) {
+ Partition partition =
getHivePartition(hiveTable, spec);
+ changeColumn(oldName,
newName, newType, colComment, first, after, partition.getSd());
+
client.alter_partition(hiveTable.getDbName(), hiveTable.getTableName(),
partition);
+ }
+ } catch (Exception e) {
+ throw new
CatalogException("Failed to cascade change column to partitions", e);
+ }
+ }
+ break;
+ default:
+ throw new CatalogException("Unsupported alter
table operation " + alterOp);
+ }
+ }
+
+ private static void ensureCascadeOnPartitionedTable(Table hiveTable,
boolean cascade) {
+ if (cascade) {
+ if (hiveTable == null) {
+ throw new CatalogException("Alter columns
cascade for a partition");
+ }
+ if (!isTablePartitioned(hiveTable)) {
+ throw new CatalogException("Alter columns
cascade for non-partitioned table");
+ }
+ }
+ }
+
+ private static void changeColumn(String oldName, String newName, String
newType, String comment, boolean first,
+ String after, StorageDescriptor sd) {
+ if (first && after != null) {
+ throw new CatalogException("Both first and after
specified for CHANGE COLUMN");
+ }
+ TypeInfo newTypeInfo = calTypeStrToHiveTypeInfo(newType);
+ List<String> oldNames = getFieldNames(sd.getCols());
+ int oldIndex = oldNames.indexOf(oldName);
+ if (oldIndex < 0) {
+ throw new CatalogException(String.format("Old column %s
not found for CHANGE COLUMN", oldName));
+ }
+ FieldSchema newField = new FieldSchema(newName,
newTypeInfo.getTypeName(), comment);
+ if ((!first && after == null) || oldName.equals(after)) {
+ sd.getCols().set(oldIndex, newField);
+ } else {
+ // need change column position
+ sd.getCols().remove(oldIndex);
+ if (first) {
+ sd.getCols().add(0, newField);
+ } else {
+ int newIndex = oldNames.indexOf(after);
+ if (newIndex < 0) {
+ throw new
CatalogException(String.format("After column %s not found for CHANGE COLUMN",
after));
+ }
+ sd.getCols().add(++newIndex, newField);
+ }
+ }
+ }
+
+ private static void addReplaceColumns(StorageDescriptor sd,
List<FieldSchema> columns, boolean replace) {
+ if (replace) {
+ sd.setCols(columns);
+ } else {
+ sd.getCols().addAll(columns);
+ }
+ }
+
+ private static TypeInfo calTypeStrToHiveTypeInfo(String typeStr) {
Review comment:
Add a `AlterTableColumnTypeOperation`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]