jackye1995 commented on a change in pull request #1633: URL: https://github.com/apache/iceberg/pull/1633#discussion_r513701228
########## File path: aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.OperationTimeoutException; +import software.amazon.awssdk.services.glue.model.ResourceNumberLimitExceededException; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; + +public class GlueTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class); + + // same as org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE + // more details: https://docs.aws.amazon.com/glue/latest/webapi/API_TableInput.html + protected static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE"; + + private final Configuration conf; + private final GlueClient glue; + private final String catalogId; + private final boolean skipArchive; + private final String databaseName; + private final String tableName; + private final String fullName; + + private FileIO fileIO; + + /** + * All-arg constructor + * @param conf Hadoop config, passed in for users of HadoopIO + * @param glue Glue client + * @param catalogId Glue catalog ID, which is the AWS account ID. When null, it uses the account of the Glue client. + * @param skipArchive if Glue should skip an old table version when creating a new version in a commit. + * By default Glue archives all old table versions after an UpdateTable call. + * but Glue has a default max number of archived table versions (can be increased). + * So for streaming use case with lots of commits, it is recommended to turn this feature off. + * @param tableIdentifier table identifier + */ + public GlueTableOperations( + Configuration conf, + GlueClient glue, + String catalogId, + boolean skipArchive, + TableIdentifier tableIdentifier) { + this.conf = conf; + this.glue = glue; + this.catalogId = catalogId; + this.skipArchive = skipArchive; + this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier); + this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier); + this.fullName = String.format("glue.%s.%s.%s", catalogId, databaseName, tableName); + } + + @Override + public FileIO io() { + if (fileIO == null) { + fileIO = new HadoopFileIO(conf); + } + return fileIO; + } + + /** + * Returns table name in the form glue.catalogId.databaseName.tableName + * @return table name + */ + @Override + protected String tableName() { + return fullName; + } + + @Override + protected void doRefresh() { + String metadataLocation = null; + Optional<Table> tableOptional = getGlueTable(); + if (tableOptional.isPresent()) { + Table table = tableOptional.get(); + GlueToIcebergConverter.validateTable(table, tableName()); + metadataLocation = table.parameters().get(METADATA_LOCATION_PROP); + } else { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("Cannot find Glue table %s after refresh, " + + "maybe another process deleted it or revoked your access permission", tableName()); + } + } + refreshFromMetadataLocation(metadataLocation); + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + boolean exceptionThrown = true; + boolean isUpdate = false; + Table glueTable = null; + try { + Optional<Table> glueTableOptional = getGlueTable(); + if (glueTableOptional.isPresent()) { + glueTable = glueTableOptional.get(); + // If we try to create the table but the metadata location is already set, then we had a concurrent commit + if (base == null && glueTable.parameters().get(METADATA_LOCATION_PROP) != null) { + throw new AlreadyExistsException("Cannot commit because table %s already exists in Glue", tableName()); + } + isUpdate = true; + LOG.debug("Committing existing Glue table: {}", tableName()); + } else { + LOG.debug("Committing new Glue table: {}", tableName()); + } + checkMetadataLocation(isUpdate, glueTable, base); + Map<String, String> parameters = isUpdate ? Maps.newHashMap(glueTable.parameters()) : Maps.newHashMap(); + updateParameters(parameters, newMetadataLocation); + persistGlueTable(isUpdate, parameters); + exceptionThrown = false; + } catch (CommitFailedException | AlreadyExistsException e) { + throw e; + } catch (ConcurrentModificationException e) { + throw new CommitFailedException(e, + "Cannot commit %s because Glue detected concurrent update", tableName()); + } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { + throw new AlreadyExistsException(e, + "Cannot commit %s because its Glue table already exists when trying to create one", tableName()); + } catch (OperationTimeoutException | ResourceNumberLimitExceededException e) { + throw new CommitFailedException(e, + "Cannot commit %s because Glue operation or resource limit exceeded, " + + "please consider contact AWS to raise the limit", tableName()); + } catch (Exception e) { + throw new RuntimeException("Unexpected exception during commit for " + tableName(), e); + } finally { + if (exceptionThrown) { + io().deleteFile(newMetadataLocation); + } + } + } + + private void checkMetadataLocation(boolean isUpdate, Table glueTable, TableMetadata base) { + String glueMetadataLocation = isUpdate ? glueTable.parameters().get(METADATA_LOCATION_PROP) : null; + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + if (!Objects.equals(baseMetadataLocation, glueMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s because base metadata location '%s' is not same as the current Glue location '%s'", + tableName(), baseMetadataLocation, glueMetadataLocation); + } + } + + private Optional<Table> getGlueTable() { + try { + GetTableResponse response = glue.getTable(GetTableRequest.builder() + .catalogId(catalogId) + .databaseName(databaseName) + .name(tableName) + .build()); + return Optional.ofNullable(response.table()); + } catch (EntityNotFoundException e) { + return Optional.empty(); + } + } + + private void updateParameters(Map<String, String> parameters, String newMetadataLocation) { + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + } + + private void persistGlueTable(boolean isUpdate, Map<String, String> parameters) { + if (isUpdate) { + glue.updateTable(UpdateTableRequest.builder() + .catalogId(catalogId) + .databaseName(databaseName) + .skipArchive(skipArchive) + .tableInput(TableInput.builder() + .name(tableName) + .parameters(parameters) + .build()) + .build()); + } else { + glue.createTable(CreateTableRequest.builder() + .catalogId(catalogId) + .databaseName(databaseName) + .tableInput(TableInput.builder() + .name(tableName) + .owner(System.getProperty("user.name")) Review comment: Agree, I am basically following what Hive is doing here. There is no other meaningful information that can be put as input here because the Glue client cannot provide the caller information. I can also just remove the field if that is cleaner. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org