luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1090119274
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -558,14 +559,14 @@ private HadoopFileSystemFactory fsFactory() {
}
private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(
- String[] partitionColumns, StorageDescriptor sd) {
+ List<String> partitionColumns, StorageDescriptor sd) {
String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
- int formatFieldCount = tableSchema.getFieldCount() -
partitionColumns.length;
+ int formatFieldCount = resolvedSchema.getColumns().size() -
partitionColumns.size();
String[] formatNames = new String[formatFieldCount];
LogicalType[] formatTypes = new LogicalType[formatFieldCount];
for (int i = 0; i < formatFieldCount; i++) {
- formatNames[i] = tableSchema.getFieldName(i).get();
- formatTypes[i] =
tableSchema.getFieldDataType(i).get().getLogicalType();
+ formatNames[i] = resolvedSchema.getColumn(i).get().getName();
+ formatTypes[i] =
resolvedSchema.getColumn(i).get().getDataType().getLogicalType();
Review Comment:
dito
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -111,13 +112,14 @@
@Nullable protected List<Map<String, String>> remainingPartitions = null;
@Nullable protected List<String> dynamicFilterPartitionKeys = null;
protected int[] projectedFields;
Review Comment:
Can `projectedFields` be removed now?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##########
@@ -97,7 +102,7 @@ public class HiveTableUtil {
private HiveTableUtil() {}
- public static TableSchema createTableSchema(
+ public static ResolvedSchema createResolvedTableSchema(
Review Comment:
Why is `ResolvedSchema`? As for as I'm concerned, it should be `Schema`.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java:
##########
@@ -1178,8 +1179,9 @@ public boolean doPhase1(
+ partition);
break;
}
- CatalogTable catalogTable =
-
getCatalogTable(tableIdentifier.asSummaryString(), qb);
+ ResolvedCatalogTable catalogTable =
+ catalogManager.resolveCatalogTable(
Review Comment:
dito. Can we avoid to call `catalogManager.resolveCatalogTable` ?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -580,7 +581,11 @@ private Optional<BulkWriter.Factory<RowData>>
createBulkWriterFactory(
TypeDescription typeDescription =
OrcSplitReaderUtil.logicalTypeToOrcType(formatType);
return Optional.of(
hiveShim.createOrcBulkWriterFactory(
- formatConf, typeDescription.toString(),
formatTypes));
+ formatConf,
+ typeDescription.toString(),
+ formatType.getFields().stream()
Review Comment:
use `formatTypes`?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java:
##########
@@ -2106,7 +2111,7 @@ public static CatalogBaseTable getCatalogBaseTable(
public static class TableSpec {
public ObjectIdentifier tableIdentifier;
public String tableName;
- public CatalogBaseTable table;
+ public ResolvedCatalogBaseTable<?> table;
Review Comment:
Do we really need `ResolvedCatalogBaseTable`? What I mean is can we avoid
to call the method `catalogManager.resolveCatalogBaseTable`? It's a internal
method, we always want to avoid to call it.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java:
##########
@@ -739,13 +739,16 @@ CatalogBaseTable instantiateCatalogTable(Table hiveTable)
{
boolean isHiveTable = isHiveTable(properties);
- TableSchema tableSchema;
// Partition keys
List<String> partitionKeys = new ArrayList<>();
+ TableSchema tableSchema;
Review Comment:
Can we also remove `TableSchema` in here?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -558,14 +559,14 @@ private HadoopFileSystemFactory fsFactory() {
}
private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(
- String[] partitionColumns, StorageDescriptor sd) {
+ List<String> partitionColumns, StorageDescriptor sd) {
String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
- int formatFieldCount = tableSchema.getFieldCount() -
partitionColumns.length;
+ int formatFieldCount = resolvedSchema.getColumns().size() -
partitionColumns.size();
String[] formatNames = new String[formatFieldCount];
LogicalType[] formatTypes = new LogicalType[formatFieldCount];
for (int i = 0; i < formatFieldCount; i++) {
- formatNames[i] = tableSchema.getFieldName(i).get();
- formatTypes[i] =
tableSchema.getFieldDataType(i).get().getLogicalType();
+ formatNames[i] = resolvedSchema.getColumn(i).get().getName();
Review Comment:
nit:
resolvedSchema.getColumnNames().get(i);
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##########
@@ -112,39 +117,43 @@ public static TableSchema createTableSchema(
HiveTableUtil.relyConstraint((byte) 0));
// PK columns cannot be null
primaryKey.ifPresent(pk -> notNullColumns.addAll(pk.getColumns()));
- return createTableSchema(
+ return createResolvedTableSchema(
fields, hiveTable.getPartitionKeys(), notNullColumns,
primaryKey.orElse(null));
}
/** Create a Flink's TableSchema from Hive table's columns and partition
keys. */
- public static TableSchema createTableSchema(
+ public static ResolvedSchema createResolvedTableSchema(
List<FieldSchema> cols,
List<FieldSchema> partitionKeys,
Set<String> notNullColumns,
- UniqueConstraint primaryKey) {
+ @Nullable UniqueConstraint primaryKey) {
List<FieldSchema> allCols = new ArrayList<>(cols);
allCols.addAll(partitionKeys);
- String[] colNames = new String[allCols.size()];
- DataType[] colTypes = new DataType[allCols.size()];
+ List<Column> columns = new ArrayList<>(allCols.size());
- for (int i = 0; i < allCols.size(); i++) {
- FieldSchema fs = allCols.get(i);
-
- colNames[i] = fs.getName();
- colTypes[i] =
+ for (FieldSchema fs : allCols) {
+ String name = fs.getName();
+ DataType dataType =
HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
- if (notNullColumns.contains(colNames[i])) {
- colTypes[i] = colTypes[i].notNull();
+ if (notNullColumns.contains(name)) {
+ dataType = dataType.notNull();
}
+ columns.add(Column.physical(fs.getName(), dataType));
}
- TableSchema.Builder builder = TableSchema.builder().fields(colNames,
colTypes);
- if (primaryKey != null) {
- builder.primaryKey(
- primaryKey.getName(), primaryKey.getColumns().toArray(new
String[0]));
+ return new ResolvedSchema(
+ columns, Collections.emptyList(),
migrateUniqueConstraint(primaryKey));
+ }
+
+ // TODO upgrade to new UniqueConstraint directly
+ private static org.apache.flink.table.catalog.UniqueConstraint
migrateUniqueConstraint(
+ UniqueConstraint primaryKey) {
+ if (primaryKey == null) {
+ return null;
}
- return builder.build();
+ return org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
+ primaryKey.getName(), primaryKey.getColumns());
}
/** Create Hive columns from Flink TableSchema. */
Review Comment:
Can this method be removed?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -1966,61 +1982,60 @@ private Operation convertAlterTableModifyCols(
}
ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
- CatalogTable oldTable = (CatalogTable) alteredTable;
+ ResolvedCatalogTable oldTable =
+ catalogManager.resolveCatalogTable((CatalogTable)
alteredTable);
// prepare properties
Map<String, String> props = new HashMap<>(oldTable.getOptions());
props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name());
if (isCascade) {
props.put(ALTER_COL_CASCADE, "true");
}
- TableSchema oldSchema = oldTable.getSchema();
+ ResolvedSchema oldSchema = oldTable.getResolvedSchema();
final int numPartCol = oldTable.getPartitionKeys().size();
- TableSchema.Builder builder = TableSchema.builder();
// add existing non-part col if we're not replacing
+ List<Column> newColumns = new ArrayList<>();
if (!replace) {
- List<TableColumn> nonPartCols =
- oldSchema.getTableColumns().subList(0,
oldSchema.getFieldCount() - numPartCol);
- for (TableColumn column : nonPartCols) {
- builder.add(column);
- }
- setWatermarkAndPK(builder, oldSchema);
+ List<Column> nonPartCols =
+ oldSchema.getColumns().subList(0,
oldSchema.getColumnCount() - numPartCol);
+
+ newColumns.addAll(nonPartCols);
}
// add new cols
for (FieldSchema col : newCols) {
- builder.add(
- TableColumn.physical(
+ newColumns.add(
+ Column.physical(
col.getName(),
HiveTypeUtil.toFlinkType(
TypeInfoUtils.getTypeInfoFromTypeString(col.getType()))));
}
// add part cols
- List<TableColumn> partCols =
+ List<Column> partCols =
oldSchema
- .getTableColumns()
- .subList(oldSchema.getFieldCount() - numPartCol,
oldSchema.getFieldCount());
- for (TableColumn column : partCols) {
- builder.add(column);
+ .getColumns()
+ .subList(
+ oldSchema.getColumnCount() - numPartCol,
+ oldSchema.getColumnCount());
+ newColumns.addAll(partCols);
+ ResolvedSchema newSchema;
+ if (!replace) {
+ newSchema =
+ new ResolvedSchema(
+ newColumns,
+ oldSchema.getWatermarkSpecs(),
+ oldSchema.getPrimaryKey().orElse(null));
+ } else {
+ newSchema = ResolvedSchema.of(newColumns);
}
return new AlterTableSchemaOperation(
tableIdentifier,
- new CatalogTableImpl(
- builder.build(),
- oldTable.getPartitionKeys(),
- props,
- oldTable.getComment()));
- }
-
- private static void setWatermarkAndPK(TableSchema.Builder builder,
TableSchema schema) {
- for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
- builder.watermark(watermarkSpec);
- }
- schema.getPrimaryKey()
- .ifPresent(
- pk -> {
- builder.primaryKey(
- pk.getName(), pk.getColumns().toArray(new
String[0]));
- });
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(newSchema).build(),
+ oldTable.getComment(),
+ oldTable.getPartitionKeys(),
+ props),
+ newSchema));
}
Review Comment:
Can the deprecated `TableSchema` be removed in method
`convertAlterTableChangeCol`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]