mchades commented on code in PR #9073: URL: https://github.com/apache/gravitino/pull/9073#discussion_r2517756793
########## core/src/main/java/org/apache/gravitino/catalog/ManagedTableOperations.java: ########## @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.connector.GenericColumn; +import org.apache.gravitino.connector.GenericTable; +import org.apache.gravitino.connector.SupportsSchemas; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.ColumnEntity; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.Expression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.utils.PrincipalUtils; + +public abstract class ManagedTableOperations implements TableCatalog { + + private static final Joiner DOT = Joiner.on("."); + + protected abstract EntityStore store(); + + protected abstract SupportsSchemas schemas(); + + protected abstract IdGenerator idGenerator(); + + @Override + public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + try { + NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels()); + schemas().loadSchema(schemaIdent); // verify schema existence + + List<TableEntity> tables = + store().list(namespace, TableEntity.class, Entity.EntityType.TABLE); + return tables.stream() + .map(t -> NameIdentifier.of(namespace, t.name())) + .toArray(NameIdentifier[]::new); + + } catch (NoSuchEntityException e) { + throw new NoSuchSchemaException(e, "Schema %s does not exist", namespace); + } catch (IOException e) { + throw new RuntimeException("Failed to list tables in schema " + namespace, e); + } + } + + @Override + public Table loadTable(NameIdentifier ident) throws NoSuchTableException { + try { + TableEntity tableEntity = store().get(ident, Entity.EntityType.TABLE, TableEntity.class); + return toGenericTable(tableEntity); + + } catch (NoSuchEntityException e) { + throw new NoSuchTableException(e, "Table %s does not exist", ident); + } catch (IOException e) { + throw new RuntimeException("Failed to load table " + ident, e); + } + } + + @Override + public Table createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map<String, String> properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + // createTable in ManagedTableOperations only stores the table metadata in the entity store. + // It doesn't handle any additional operations like creating physical location, preprocessing + // the properties, etc. Those operations should be handled in the specific catalog + // implementation. + + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + + if (!schemas().schemaExists(schemaIdent)) { + throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent); + } Review Comment: It seems unnecessary, since we can catch `NoSuchEntityException` when call `store().put(tableEntity, false /* overwrite */)` ########## core/src/main/java/org/apache/gravitino/catalog/ManagedTableOperations.java: ########## @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.connector.GenericColumn; +import org.apache.gravitino.connector.GenericTable; +import org.apache.gravitino.connector.SupportsSchemas; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.ColumnEntity; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.Expression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.utils.PrincipalUtils; + +public abstract class ManagedTableOperations implements TableCatalog { + + private static final Joiner DOT = Joiner.on("."); + + protected abstract EntityStore store(); + + protected abstract SupportsSchemas schemas(); + + protected abstract IdGenerator idGenerator(); + + @Override + public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + try { + NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels()); + schemas().loadSchema(schemaIdent); // verify schema existence Review Comment: It seems unnecessary, since `store().list(namespace, TableEntity.class, Entity.EntityType.TABLE)` will automatically determine if the schema exists. ########## core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java: ########## @@ -47,11 +47,16 @@ protected GenericLakehouseTable internalBuild() { GenericLakehouseTable genericLakehouseTable = new GenericLakehouseTable(); genericLakehouseTable.columns = this.columns; genericLakehouseTable.comment = this.comment; - genericLakehouseTable.properties = - ImmutableMap.<String, String>builder() - .putAll(this.properties) - .put(Table.PROPERTY_TABLE_FORMAT, this.format) - .buildKeepingLast(); + + if (format != null) { + genericLakehouseTable.properties = + ImmutableMap.<String, String>builder() + .putAll(this.properties) + .put(Table.PROPERTY_TABLE_FORMAT, this.format) + .buildKeepingLast(); + } else { Review Comment: Should we allow the `format` property in `GenericLakehouseTable` to be null? ########## core/src/main/java/org/apache/gravitino/catalog/ManagedTableOperations.java: ########## @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityAlreadyExistsException; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.connector.GenericColumn; +import org.apache.gravitino.connector.GenericTable; +import org.apache.gravitino.connector.SupportsSchemas; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.ColumnEntity; +import org.apache.gravitino.meta.TableEntity; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.Expression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; +import org.apache.gravitino.rel.types.Type; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.utils.PrincipalUtils; + +public abstract class ManagedTableOperations implements TableCatalog { + + private static final Joiner DOT = Joiner.on("."); + + protected abstract EntityStore store(); + + protected abstract SupportsSchemas schemas(); + + protected abstract IdGenerator idGenerator(); + + @Override + public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + try { + NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels()); + schemas().loadSchema(schemaIdent); // verify schema existence + + List<TableEntity> tables = + store().list(namespace, TableEntity.class, Entity.EntityType.TABLE); + return tables.stream() + .map(t -> NameIdentifier.of(namespace, t.name())) + .toArray(NameIdentifier[]::new); + + } catch (NoSuchEntityException e) { + throw new NoSuchSchemaException(e, "Schema %s does not exist", namespace); + } catch (IOException e) { + throw new RuntimeException("Failed to list tables in schema " + namespace, e); + } + } + + @Override + public Table loadTable(NameIdentifier ident) throws NoSuchTableException { + try { + TableEntity tableEntity = store().get(ident, Entity.EntityType.TABLE, TableEntity.class); + return toGenericTable(tableEntity); + + } catch (NoSuchEntityException e) { + throw new NoSuchTableException(e, "Table %s does not exist", ident); + } catch (IOException e) { + throw new RuntimeException("Failed to load table " + ident, e); + } + } + + @Override + public Table createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map<String, String> properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + // createTable in ManagedTableOperations only stores the table metadata in the entity store. + // It doesn't handle any additional operations like creating physical location, preprocessing + // the properties, etc. Those operations should be handled in the specific catalog + // implementation. + + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + + if (!schemas().schemaExists(schemaIdent)) { + throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent); + } + + StringIdentifier stringId = StringIdentifier.fromProperties(properties); + Preconditions.checkArgument(stringId != null, "Property String identifier should not be null"); + + AuditInfo auditInfo = + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build(); + + TableEntity tableEntity = + TableEntity.builder() + .withName(ident.name()) + .withId(stringId.id()) + .withNamespace(ident.namespace()) + .withComment(comment) + .withColumns(toColumnEntities(columns, auditInfo, idGenerator())) + .withProperties(properties) + .withPartitioning(partitions) + .withDistribution(distribution) + .withSortOrders(sortOrders) + .withIndexes(indexes) + .withAuditInfo(auditInfo) + .build(); + + try { + store().put(tableEntity, false /* overwrite */); + } catch (EntityAlreadyExistsException e) { + throw new TableAlreadyExistsException(e, "Table %s already exists", ident); + } catch (IOException e) { + throw new RuntimeException("Failed to create table " + ident, e); + } + + return toGenericTable(tableEntity); + } + + @Override + public Table alterTable(NameIdentifier ident, TableChange... changes) + throws NoSuchTableException, IllegalArgumentException { + // The alterTable in ManagedTableOperations only updates the table metadata in the entity store. + // It doesn't handle any additional operations like modifying physical data, etc. Those + // operations should be handled in the specific catalog implementation. + try { + TableEntity newTableEntity = + store() + .update( + ident, + TableEntity.class, + Entity.EntityType.TABLE, + oldEntity -> applyChanges(oldEntity, changes)); + + return toGenericTable(newTableEntity); + } catch (NoSuchEntityException e) { + throw new NoSuchTableException(e, "Table %s does not exist", ident); + } catch (EntityAlreadyExistsException e) { + throw new IllegalArgumentException( + "Failed to rename table " + ident + " due to table already exists: ", e); + } catch (IOException e) { + throw new RuntimeException("Failed to alter table " + ident, e); + } + } + + @Override + public boolean purgeTable(NameIdentifier ident) { + // For Gravitino managed tables, purgeTable is equivalent to dropTable. It only removes the + // table metadata from the entity store. Physical data deletion should be handled by the + // specific catalog implementation if needed. + return dropTable(ident); + } + + @Override + public boolean dropTable(NameIdentifier ident) { + try { + return store().delete(ident, Entity.EntityType.TABLE); + } catch (NoSuchEntityException e) { + return false; + } catch (IOException e) { + throw new RuntimeException("Failed to drop metadata for table " + ident, e); + } + } + + private TableEntity applyChanges(TableEntity oldTableEntity, TableChange... changes) { + String newName = oldTableEntity.name(); + String newComment = oldTableEntity.comment(); + Map<String, String> newProps = Maps.newHashMap(oldTableEntity.properties()); + List<ColumnEntity> newColumns = Lists.newArrayList(oldTableEntity.columns()); + List<Index> newIndexes = Lists.newArrayList(oldTableEntity.indexes()); + + Map<Boolean, List<TableChange>> splitChanges = + Arrays.stream(changes) + .collect( + Collectors.partitioningBy(change -> change instanceof TableChange.ColumnChange)); + List<TableChange.ColumnChange> columnChanges = + splitChanges.get(true).stream() + .map(change -> (TableChange.ColumnChange) change) + .collect(Collectors.toList()); + List<TableChange> tableChanges = splitChanges.get(false); + + for (TableChange change : tableChanges) { + if (change instanceof TableChange.RenameTable rename) { + if (rename.getNewSchemaName().isPresent()) { + throw new IllegalArgumentException( + "Gravitino managed table doesn't support renaming " + + "the table across schemas for now"); Review Comment: will we support this later? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
