This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch hudi-aws-lombokify in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fddcf32b8131604160a24e45d3a2788ae78e1419 Author: voon <[email protected]> AuthorDate: Mon Dec 8 19:52:46 2025 +0800 Apply lombok annotations remove boilerplate code to hudi-aws --- hudi-aws/pom.xml | 6 ++ ...dieConfigAWSAssumedRoleCredentialsProvider.java | 6 +- .../HoodieConfigAWSCredentialsProvider.java | 8 +-- .../cloudwatch/CloudWatchMetricsReporter.java | 10 ++- .../aws/metrics/cloudwatch/CloudWatchReporter.java | 12 ++-- .../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 81 ++++++++++------------ .../lock/DynamoDBBasedLockProviderBase.java | 7 +- .../aws/transaction/lock/S3StorageLockClient.java | 7 +- .../apache/hudi/aws/utils/DynamoTableUtils.java | 16 ++--- .../hudi/aws/sync/ITTestGluePartitionPushdown.java | 6 +- 10 files changed, 69 insertions(+), 90 deletions(-) diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index 7c0e146869b0..c013d92b581f 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -41,6 +41,12 @@ <artifactId>log4j-1.2-api</artifactId> </dependency> + <!-- Lombok --> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + <!-- Hoodie --> <dependency> <groupId>org.apache.hudi</groupId> diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java index 292f4722da5b..55b16bad2b10 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java @@ -21,8 +21,7 @@ package org.apache.hudi.aws.credentials; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieAWSConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.services.sts.StsClient; @@ -34,10 +33,9 @@ import java.util.Properties; /** * Credentials provider which assumes AWS role from Hoodie config and fetches its credentials. */ +@Slf4j public class HoodieConfigAWSAssumedRoleCredentialsProvider implements AwsCredentialsProvider { - private static final Logger LOG = LoggerFactory.getLogger(HoodieConfigAWSAssumedRoleCredentialsProvider.class); - private final StsAssumeRoleCredentialsProvider credentialsProvider; public HoodieConfigAWSAssumedRoleCredentialsProvider(Properties props) { diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java index 105a4ffbb649..d8afb8a2531b 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java @@ -21,8 +21,7 @@ package org.apache.hudi.aws.credentials; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieAWSConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -33,10 +32,9 @@ import java.util.Properties; /** * Credentials provider which fetches AWS access key from Hoodie config. */ +@Slf4j public class HoodieConfigAWSCredentialsProvider implements AwsCredentialsProvider { - private static final Logger LOG = LoggerFactory.getLogger(HoodieConfigAWSCredentialsProvider.class); - private AwsCredentials awsCredentials; public HoodieConfigAWSCredentialsProvider(Properties props) { @@ -45,7 +43,7 @@ public class HoodieConfigAWSCredentialsProvider implements AwsCredentialsProvide String sessionToken = props.getProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key()); if (StringUtils.isNullOrEmpty(accessKey) || StringUtils.isNullOrEmpty(secretKey)) { - LOG.debug("AWS access key or secret key not found in the Hudi configuration. " + log.debug("AWS access key or secret key not found in the Hudi configuration. " + "Use default AWS credentials"); } else { this.awsCredentials = createCredentials(accessKey, secretKey, sessionToken); diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java index 03f71410b36d..b47935bc3293 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java @@ -23,8 +23,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.MetricsReporter; import com.codahale.metrics.MetricRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; @@ -32,10 +31,9 @@ import java.util.concurrent.TimeUnit; * Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie metrics configurations and hooking up with * {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting tasks to {@link CloudWatchReporter}. */ +@Slf4j public class CloudWatchMetricsReporter extends MetricsReporter { - private static final Logger LOG = LoggerFactory.getLogger(CloudWatchMetricsReporter.class); - private final MetricRegistry registry; private final HoodieMetricsConfig metricsConfig; private final CloudWatchReporter reporter; @@ -70,7 +68,7 @@ public class CloudWatchMetricsReporter extends MetricsReporter { @Override public void start() { - LOG.info("Starting CloudWatch Metrics Reporter."); + log.info("Starting CloudWatch Metrics Reporter."); reporter.start(metricsConfig.getCloudWatchReportPeriodSeconds(), TimeUnit.SECONDS); } @@ -81,7 +79,7 @@ public class CloudWatchMetricsReporter extends MetricsReporter { @Override public void stop() { - LOG.info("Stopping CloudWatch Metrics Reporter."); + log.info("Stopping CloudWatch Metrics Reporter."); reporter.stop(); } } diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java index 037bab52db16..ba9abc55bef7 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java @@ -32,8 +32,7 @@ import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.Timer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.cloudwatch.model.Dimension; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; @@ -54,6 +53,7 @@ import java.util.concurrent.TimeUnit; * A reporter for publishing metrics to Amazon CloudWatch. It is responsible for collecting, converting DropWizard * metrics to CloudWatch metrics and composing metrics payload. */ +@Slf4j public class CloudWatchReporter extends ScheduledReporter { static final String DIMENSION_TABLE_NAME_KEY = "Table"; @@ -61,8 +61,6 @@ public class CloudWatchReporter extends ScheduledReporter { static final String DIMENSION_GAUGE_TYPE_VALUE = "gauge"; static final String DIMENSION_COUNT_TYPE_VALUE = "count"; - private static final Logger LOG = LoggerFactory.getLogger(CloudWatchReporter.class); - private final CloudWatchAsyncClient cloudWatchClientAsync; private final Clock clock; private final String prefix; @@ -181,7 +179,7 @@ public class CloudWatchReporter extends ScheduledReporter { SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) { - LOG.info("Reporting Metrics to CloudWatch."); + log.info("Reporting Metrics to CloudWatch."); final long timestampMilliSec = clock.getTime(); List<MetricDatum> metricsData = new ArrayList<>(); @@ -234,7 +232,7 @@ public class CloudWatchReporter extends ScheduledReporter { try { cloudWatchFuture.get(30, TimeUnit.SECONDS); } catch (final Exception ex) { - LOG.error("Error reporting metrics to CloudWatch. The data in this CloudWatch request " + log.error("Error reporting metrics to CloudWatch. The data in this CloudWatch request " + "may have been discarded, and not made it to CloudWatch.", ex); if (ex instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -316,7 +314,7 @@ public class CloudWatchReporter extends ScheduledReporter { try { cloudWatchClientAsync.close(); } catch (Exception ex) { - LOG.warn("Exception while shutting down CloudWatch client.", ex); + log.warn("Exception while shutting down CloudWatch client.", ex); } } } diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index dab55d6b4f1d..2dc45a255a63 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -38,9 +38,9 @@ import org.apache.hudi.sync.common.HoodieSyncClient; import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.model.Partition; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.parquet.schema.MessageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.glue.GlueAsyncClient; import software.amazon.awssdk.services.glue.GlueAsyncClientBuilder; @@ -110,14 +110,14 @@ import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty; import static org.apache.hudi.config.GlueCatalogSyncClientConfig.ALL_PARTITIONS_READ_PARALLELISM; import static org.apache.hudi.config.GlueCatalogSyncClientConfig.CHANGED_PARTITIONS_READ_PARALLELISM; import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING; +import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_DATABASE_NAME; +import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_RESOURCE_TAGS; +import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_TABLE_NAME; import static org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS; import static org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS_ENABLE; import static org.apache.hudi.config.GlueCatalogSyncClientConfig.PARTITION_CHANGE_PARALLELISM; import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_ENDPOINT; import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_REGION; -import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_DATABASE_NAME; -import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_RESOURCE_TAGS; -import static org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_TABLE_NAME; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType; @@ -132,9 +132,8 @@ import static org.apache.hudi.sync.common.util.TableUtils.tableId; * * @Experimental */ +@Slf4j public class AWSGlueCatalogSyncClient extends HoodieSyncClient { - - private static final Logger LOG = LoggerFactory.getLogger(AWSGlueCatalogSyncClient.class); private static final int MAX_PARTITIONS_PER_CHANGE_REQUEST = 100; private static final int MAX_PARTITIONS_PER_READ_REQUEST = 1000; private static final int MAX_DELETE_PARTITIONS_PER_REQUEST = 25; @@ -148,7 +147,9 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { private static final String ENABLE_MDT_LISTING = "hudi.metadata-listing-enabled"; private static final String GLUE_TABLE_ARN_FORMAT = "arn:aws:glue:%s:%s:table/%s/%s"; private static final String GLUE_DATABASE_ARN_FORMAT = "arn:aws:glue:%s:%s:database/%s"; + @Getter private final String databaseName; + @Getter private final String tableName; private final boolean skipTableArchive; @@ -191,16 +192,6 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { } } - @Override - public String getTableName() { - return this.tableName; - } - - @Override - public String getDatabaseName() { - return this.databaseName; - } - private List<Partition> getPartitionsSegment(Segment segment, String tableName) { try { List<Partition> partitions = new ArrayList<>(); @@ -259,7 +250,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { @Override public List<Partition> getPartitionsFromList(String tableName, List<String> partitionList) { if (partitionList.isEmpty()) { - LOG.info("No partitions to read for " + tableId(this.databaseName, tableName)); + log.info("No partitions to read for " + tableId(this.databaseName, tableName)); return Collections.emptyList(); } HoodieTimer timer = HoodieTimer.start(); @@ -278,7 +269,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { for (Future<List<Partition>> future : futures) { partitions.addAll(future.get()); } - LOG.info( + log.info( "Requested {} partitions, found existing {} partitions, new {} partitions", partitionList.size(), partitions.size(), @@ -289,7 +280,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(this.databaseName, tableName), e); } finally { executorService.shutdownNow(); - LOG.info("Took {} ms to get {} partitions for table {}", timer.endTimer(), partitionList.size(), tableId(this.databaseName, tableName)); + log.info("Took {} ms to get {} partitions for table {}", timer.endTimer(), partitionList.size(), tableId(this.databaseName, tableName)); } } @@ -317,13 +308,13 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { HoodieTimer timer = HoodieTimer.start(); try { if (partitionsToAdd.isEmpty()) { - LOG.info("No partitions to add for " + tableId(this.databaseName, tableName)); + log.info("No partitions to add for " + tableId(this.databaseName, tableName)); return; } Table table = getTable(awsGlue, databaseName, tableName); parallelizeChange(partitionsToAdd, this.changeParallelism, partitions -> this.addPartitionsToTableInternal(table, partitions), MAX_PARTITIONS_PER_CHANGE_REQUEST); } finally { - LOG.info("Added {} partitions to table {} in {} ms", partitionsToAdd.size(), tableId(this.databaseName, tableName), timer.endTimer()); + log.info("Added {} partitions to table {} in {} ms", partitionsToAdd.size(), tableId(this.databaseName, tableName), timer.endTimer()); } } @@ -364,7 +355,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { if (response.errors().stream() .allMatch( (error) -> "AlreadyExistsException".equals(error.errorDetail().errorCode()))) { - LOG.info("Partitions already exist in glue: {}", response.errors()); + log.info("Partitions already exist in glue: {}", response.errors()); } else { throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, table.name()) + " with error(s): " + response.errors()); @@ -380,13 +371,13 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { HoodieTimer timer = HoodieTimer.start(); try { if (changedPartitions.isEmpty()) { - LOG.info("No partitions to update for " + tableId(this.databaseName, tableName)); + log.info("No partitions to update for " + tableId(this.databaseName, tableName)); return; } Table table = getTable(awsGlue, databaseName, tableName); parallelizeChange(changedPartitions, this.changeParallelism, partitions -> this.updatePartitionsToTableInternal(table, partitions), MAX_PARTITIONS_PER_CHANGE_REQUEST); } finally { - LOG.info("Updated {} partitions to table {} in {} ms", changedPartitions.size(), tableId(this.databaseName, tableName), timer.endTimer()); + log.info("Updated {} partitions to table {} in {} ms", changedPartitions.size(), tableId(this.databaseName, tableName), timer.endTimer()); } } @@ -420,12 +411,12 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { HoodieTimer timer = HoodieTimer.start(); try { if (partitionsToDrop.isEmpty()) { - LOG.info("No partitions to drop for " + tableId(this.databaseName, tableName)); + log.info("No partitions to drop for " + tableId(this.databaseName, tableName)); return; } parallelizeChange(partitionsToDrop, this.changeParallelism, partitions -> this.dropPartitionsInternal(tableName, partitions), MAX_DELETE_PARTITIONS_PER_REQUEST); } finally { - LOG.info("Deleted {} partitions to table {} in {} ms", partitionsToDrop.size(), tableId(this.databaseName, tableName), timer.endTimer()); + log.info("Deleted {} partitions to table {} in {} ms", partitionsToDrop.size(), tableId(this.databaseName, tableName), timer.endTimer()); } } @@ -573,7 +564,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { // TODO: skip cascading when new fields in structs are added to the schema in last position boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0 && !schemaDiff.getUpdateColumnTypes().isEmpty(); if (cascade) { - LOG.info("Cascading column changes to partitions"); + log.info("Cascading column changes to partitions"); List<String> allPartitions = getAllPartitions(tableName).stream() .map(partition -> getStringFromPartition(table.partitionKeys(), partition.getValues())) .collect(Collectors.toList()); @@ -695,9 +686,9 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { .build(); CreateTableResponse response = awsGlue.createTable(request).get(); - LOG.info("Created table {} : {}", tableId(databaseName, tableName), response); + log.info("Created table {} : {}", tableId(databaseName, tableName), response); } catch (AlreadyExistsException e) { - LOG.warn("Table {} already exists.", tableId(databaseName, tableName), e); + log.warn("Table {} already exists.", tableId(databaseName, tableName), e); } catch (Exception e) { throw new HoodieGlueSyncException("Fail to create " + tableId(databaseName, tableName), e); } @@ -716,14 +707,14 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { if (!config.getBooleanOrDefault(META_SYNC_PARTITION_INDEX_FIELDS_ENABLE)) { // deactivate indexing if enabled if (getPartitionIndexEnable(tableName)) { - LOG.info("Deactivating partition indexing"); + log.info("Deactivating partition indexing"); updatePartitionIndexEnable(tableName, false); } // also drop all existing indexes GetPartitionIndexesRequest indexesRequest = GetPartitionIndexesRequest.builder().databaseName(databaseName).tableName(tableName).build(); GetPartitionIndexesResponse existingIdxsResp = awsGlue.getPartitionIndexes(indexesRequest).get(); for (PartitionIndexDescriptor idsToDelete : existingIdxsResp.partitionIndexDescriptorList()) { - LOG.info("Dropping partition index: {}", idsToDelete.indexName()); + log.info("Dropping partition index: {}", idsToDelete.indexName()); DeletePartitionIndexRequest idxToDelete = DeletePartitionIndexRequest.builder() .databaseName(databaseName).tableName(tableName).indexName(idsToDelete.indexName()).build(); awsGlue.deletePartitionIndex(idxToDelete).get(); @@ -731,7 +722,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { } else { // activate indexing usage if disabled if (!getPartitionIndexEnable(tableName)) { - LOG.info("Activating partition indexing"); + log.info("Activating partition indexing"); updatePartitionIndexEnable(tableName, true); } @@ -756,7 +747,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { indexesChanges = true; DeletePartitionIndexRequest idxToDelete = DeletePartitionIndexRequest.builder() .databaseName(databaseName).tableName(tableName).indexName(existingIdx.indexName()).build(); - LOG.info("Dropping irrelevant index: {}", existingIdx.indexName()); + log.info("Dropping irrelevant index: {}", existingIdx.indexName()); awsGlue.deletePartitionIndex(idxToDelete).get(); } } @@ -778,7 +769,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { PartitionIndex newIdx = PartitionIndex.builder() .indexName(newIdxName) .keys(neededIdx).build(); - LOG.info("Creating new partition index: {}", newIdxName); + log.info("Creating new partition index: {}", newIdxName); CreatePartitionIndexRequest creationRequest = CreatePartitionIndexRequest.builder() .databaseName(databaseName).tableName(tableName).partitionIndex(newIdx).build(); awsGlue.createPartitionIndex(creationRequest).get(); @@ -794,7 +785,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { .map(idx -> Arrays.stream(idx.split(";")) .collect(Collectors.toList())).collect(Collectors.toList()); if (indexes.size() > PARTITION_INDEX_MAX_NUMBER) { - LOG.warn("Only considering first {} indexes", PARTITION_INDEX_MAX_NUMBER); + log.warn("Only considering first {} indexes", PARTITION_INDEX_MAX_NUMBER); return indexes.subList(0, PARTITION_INDEX_MAX_NUMBER); } return indexes; @@ -853,7 +844,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { return Objects.nonNull(table); } catch (ExecutionException e) { if (e.getCause() instanceof EntityNotFoundException) { - LOG.info("Table not found: {}.{}", databaseName, tableName, e); + log.info("Table not found: {}.{}", databaseName, tableName, e); return false; } else { throw new HoodieGlueSyncException("Fail to get table: " + tableId(databaseName, tableName), e); @@ -873,7 +864,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { return Objects.nonNull(awsGlue.getDatabase(request).get().database()); } catch (ExecutionException e) { if (e.getCause() instanceof EntityNotFoundException) { - LOG.info("Database not found: {}", databaseName, e); + log.info("Database not found: {}", databaseName, e); return false; } else { throw new HoodieGlueSyncException("Fail to check if database exists " + databaseName, e); @@ -900,9 +891,9 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { try { CreateDatabaseResponse result = awsGlue.createDatabase(request).get(); tagResource(String.format(GLUE_DATABASE_ARN_FORMAT, awsGlue.serviceClientConfiguration().region(), catalogId, databaseName)); - LOG.info("Successfully created database in AWS Glue: {}", result.toString()); + log.info("Successfully created database in AWS Glue: {}", result.toString()); } catch (AlreadyExistsException e) { - LOG.info("AWS Glue Database {} already exists", databaseName, e); + log.info("AWS Glue Database {} already exists", databaseName, e); } catch (Exception e) { throw new HoodieGlueSyncException("Fail to create database " + databaseName, e); } @@ -950,7 +941,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e); } } else { - LOG.info("No commit in active timeline."); + log.info("No commit in active timeline."); } try { // as a side effect, we also refresh the partition indexes if needed @@ -958,9 +949,9 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { // therefore we call this at each commit as a workaround managePartitionIndexes(tableName); } catch (ExecutionException e) { - LOG.warn("An indexation process is currently running.", e); + log.warn("An indexation process is currently running.", e); } catch (Exception e) { - LOG.warn("Something went wrong with partition index", e); + log.warn("Something went wrong with partition index", e); } } @@ -974,7 +965,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { try { awsGlue.deleteTable(deleteTableRequest).get(); - LOG.info("Successfully deleted table in AWS Glue: {}.{}", databaseName, tableName); + log.info("Successfully deleted table in AWS Glue: {}.{}", databaseName, tableName); } catch (Exception e) { if (e instanceof InterruptedException) { // In case {@code InterruptedException} was thrown, resetting the interrupted flag @@ -1040,7 +1031,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { boolean different = serdeProperties.entrySet().stream().anyMatch(e -> !existingSerdeProperties.containsKey(e.getKey()) || !existingSerdeProperties.get(e.getKey()).equals(e.getValue())); if (!different) { - LOG.debug("Table {} serdeProperties already up to date, skip update serde properties.", tableName); + log.debug("Table {} serdeProperties already up to date, skip update serde properties.", tableName); return false; } } diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java index 66c1f462afc0..c956b5371c77 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java @@ -33,6 +33,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; import com.amazonaws.services.dynamodbv2.LockItem; import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException; +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; @@ -75,6 +76,7 @@ public abstract class DynamoDBBasedLockProviderBase implements LockProvider<Lock protected final AmazonDynamoDBLockClient client; protected final String tableName; protected final String dynamoDBPartitionKey; + @Getter protected volatile LockItem lock; protected DynamoDBBasedLockProviderBase(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) { @@ -156,11 +158,6 @@ public abstract class DynamoDBBasedLockProviderBase implements LockProvider<Lock } } - @Override - public LockItem getLock() { - return lock; - } - private static DynamoDbClient getDynamoDBClient(DynamoDbBasedLockConfig dynamoDbBasedLockConfig) { String region = dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_REGION); String endpointURL = dynamoDbBasedLockConfig.contains(DYNAMODB_ENDPOINT_URL.key()) diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java index c6034ca80783..4a6fa605581d 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java @@ -32,8 +32,8 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkClientException; @@ -63,10 +63,9 @@ import static org.apache.hudi.config.StorageBasedLockConfig.VALIDITY_TIMEOUT_SEC * S3-based distributed lock client using ETag checks (AWS SDK v2). * See RFC: <a href="https://github.com/apache/hudi/blob/master/rfc/rfc-91/rfc-91.md">RFC-91</a> */ +@Slf4j @ThreadSafe public class S3StorageLockClient implements StorageLockClient { - - private static final Logger LOG = LoggerFactory.getLogger(S3StorageLockClient.class); private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; private static final int NOT_FOUND_ERROR_CODE = 404; private static final int CONDITIONAL_REQUEST_CONFLICT_ERROR_CODE = 409; @@ -87,7 +86,7 @@ public class S3StorageLockClient implements StorageLockClient { * @param props The properties for the lock config, can be used to customize client. */ public S3StorageLockClient(String ownerId, String lockFileUri, Properties props) { - this(ownerId, lockFileUri, props, createDefaultS3Client(), LOG); + this(ownerId, lockFileUri, props, createDefaultS3Client(), log); } @VisibleForTesting diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java index b507ca2bf478..ace74a1fbc79 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java @@ -18,8 +18,7 @@ package org.apache.hudi.aws.utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; @@ -45,14 +44,11 @@ import software.amazon.awssdk.services.dynamodb.model.TableStatus; * // ... start making calls to table ... * </pre> */ +@Slf4j public class DynamoTableUtils { private static final int DEFAULT_WAIT_TIMEOUT = 20 * 60 * 1000; private static final int DEFAULT_WAIT_INTERVAL = 10 * 1000; - /** - * The logging utility. - */ - private static final Logger LOGGER = LoggerFactory.getLogger(DynamoTableUtils.class); /** * Waits up to 10 minutes for a specified DynamoDB table to resolve, @@ -225,8 +221,8 @@ public class DynamoTableUtils { dynamo.createTable(createTableRequest); return true; } catch (final ResourceInUseException e) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Table " + createTableRequest.tableName() + " already exists", e); + if (log.isTraceEnabled()) { + log.trace("Table " + createTableRequest.tableName() + " already exists", e); } } return false; @@ -243,8 +239,8 @@ public class DynamoTableUtils { dynamo.deleteTable(deleteTableRequest); return true; } catch (final ResourceNotFoundException e) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Table " + deleteTableRequest.tableName() + " does not exist", e); + if (log.isTraceEnabled()) { + log.trace("Table " + deleteTableRequest.tableName() + " does not exist", e); } } return false; diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java index b4bb290e25c6..9b453e9a1a2a 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java @@ -28,6 +28,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.sync.common.model.FieldSchema; +import lombok.NoArgsConstructor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,7 +49,6 @@ import software.amazon.awssdk.services.glue.model.SerDeInfo; import software.amazon.awssdk.services.glue.model.StorageDescriptor; import software.amazon.awssdk.services.glue.model.TableInput; -import java.io.IOException; import java.nio.file.Files; import java.time.Instant; import java.util.Arrays; @@ -59,6 +59,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; @Disabled("HUDI-7475 The tests do not work. Disabling them to unblock Azure CI") +@NoArgsConstructor public class ITTestGluePartitionPushdown { // This port number must be the same as {@code moto.port} defined in pom.xml private static final int MOTO_PORT = 5002; @@ -73,9 +74,6 @@ public class ITTestGluePartitionPushdown { private Column[] partitionsColumn = {Column.builder().name("part1").type("int").build(), Column.builder().name("part2").type("string").build()}; List<FieldSchema> partitionsFieldSchema = Arrays.asList(new FieldSchema("part1", "int"), new FieldSchema("part2", "string")); - public ITTestGluePartitionPushdown() throws IOException { - } - @BeforeEach public void setUp() throws Exception { hiveSyncProps = new TypedProperties();
