This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1a37b351d2c1 refactor: Apply lombok annotations remove boilerplate
code to hudi-aws (#17522)
1a37b351d2c1 is described below
commit 1a37b351d2c1a6d0383b9061b3f50566828b86e6
Author: voonhous <[email protected]>
AuthorDate: Tue Dec 9 03:10:51 2025 +0800
refactor: Apply lombok annotations remove boilerplate code to hudi-aws
(#17522)
---
hudi-aws/pom.xml | 6 ++
...dieConfigAWSAssumedRoleCredentialsProvider.java | 6 +-
.../HoodieConfigAWSCredentialsProvider.java | 8 +--
.../cloudwatch/CloudWatchMetricsReporter.java | 10 ++-
.../aws/metrics/cloudwatch/CloudWatchReporter.java | 12 ++--
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 81 ++++++++++------------
.../lock/DynamoDBBasedLockProviderBase.java | 7 +-
.../aws/transaction/lock/S3StorageLockClient.java | 7 +-
.../apache/hudi/aws/utils/DynamoTableUtils.java | 16 ++---
.../hudi/aws/sync/ITTestGluePartitionPushdown.java | 10 ++-
10 files changed, 71 insertions(+), 92 deletions(-)
diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml
index 7c0e146869b0..c013d92b581f 100644
--- a/hudi-aws/pom.xml
+++ b/hudi-aws/pom.xml
@@ -41,6 +41,12 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java
index 292f4722da5b..55b16bad2b10 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSAssumedRoleCredentialsProvider.java
@@ -21,8 +21,7 @@ package org.apache.hudi.aws.credentials;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieAWSConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.sts.StsClient;
@@ -34,10 +33,9 @@ import java.util.Properties;
/**
* Credentials provider which assumes AWS role from Hoodie config and fetches
its credentials.
*/
+@Slf4j
public class HoodieConfigAWSAssumedRoleCredentialsProvider implements
AwsCredentialsProvider {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieConfigAWSAssumedRoleCredentialsProvider.class);
-
private final StsAssumeRoleCredentialsProvider credentialsProvider;
public HoodieConfigAWSAssumedRoleCredentialsProvider(Properties props) {
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java
index 105a4ffbb649..d8afb8a2531b 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java
@@ -21,8 +21,7 @@ package org.apache.hudi.aws.credentials;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieAWSConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -33,10 +32,9 @@ import java.util.Properties;
/**
* Credentials provider which fetches AWS access key from Hoodie config.
*/
+@Slf4j
public class HoodieConfigAWSCredentialsProvider implements
AwsCredentialsProvider {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieConfigAWSCredentialsProvider.class);
-
private AwsCredentials awsCredentials;
public HoodieConfigAWSCredentialsProvider(Properties props) {
@@ -45,7 +43,7 @@ public class HoodieConfigAWSCredentialsProvider implements
AwsCredentialsProvide
String sessionToken =
props.getProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key());
if (StringUtils.isNullOrEmpty(accessKey) ||
StringUtils.isNullOrEmpty(secretKey)) {
- LOG.debug("AWS access key or secret key not found in the Hudi
configuration. "
+ log.debug("AWS access key or secret key not found in the Hudi
configuration. "
+ "Use default AWS credentials");
} else {
this.awsCredentials = createCredentials(accessKey, secretKey,
sessionToken);
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java
index 03f71410b36d..b47935bc3293 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchMetricsReporter.java
@@ -23,8 +23,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.metrics.MetricsReporter;
import com.codahale.metrics.MetricRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@@ -32,10 +31,9 @@ import java.util.concurrent.TimeUnit;
* Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie
metrics configurations and hooking up with
* {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting
tasks to {@link CloudWatchReporter}.
*/
+@Slf4j
public class CloudWatchMetricsReporter extends MetricsReporter {
- private static final Logger LOG =
LoggerFactory.getLogger(CloudWatchMetricsReporter.class);
-
private final MetricRegistry registry;
private final HoodieMetricsConfig metricsConfig;
private final CloudWatchReporter reporter;
@@ -70,7 +68,7 @@ public class CloudWatchMetricsReporter extends
MetricsReporter {
@Override
public void start() {
- LOG.info("Starting CloudWatch Metrics Reporter.");
+ log.info("Starting CloudWatch Metrics Reporter.");
reporter.start(metricsConfig.getCloudWatchReportPeriodSeconds(),
TimeUnit.SECONDS);
}
@@ -81,7 +79,7 @@ public class CloudWatchMetricsReporter extends
MetricsReporter {
@Override
public void stop() {
- LOG.info("Stopping CloudWatch Metrics Reporter.");
+ log.info("Stopping CloudWatch Metrics Reporter.");
reporter.stop();
}
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java
index 037bab52db16..ba9abc55bef7 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/metrics/cloudwatch/CloudWatchReporter.java
@@ -32,8 +32,7 @@ import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Timer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
@@ -54,6 +53,7 @@ import java.util.concurrent.TimeUnit;
* A reporter for publishing metrics to Amazon CloudWatch. It is responsible
for collecting, converting DropWizard
* metrics to CloudWatch metrics and composing metrics payload.
*/
+@Slf4j
public class CloudWatchReporter extends ScheduledReporter {
static final String DIMENSION_TABLE_NAME_KEY = "Table";
@@ -61,8 +61,6 @@ public class CloudWatchReporter extends ScheduledReporter {
static final String DIMENSION_GAUGE_TYPE_VALUE = "gauge";
static final String DIMENSION_COUNT_TYPE_VALUE = "count";
- private static final Logger LOG =
LoggerFactory.getLogger(CloudWatchReporter.class);
-
private final CloudWatchAsyncClient cloudWatchClientAsync;
private final Clock clock;
private final String prefix;
@@ -181,7 +179,7 @@ public class CloudWatchReporter extends ScheduledReporter {
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
- LOG.info("Reporting Metrics to CloudWatch.");
+ log.info("Reporting Metrics to CloudWatch.");
final long timestampMilliSec = clock.getTime();
List<MetricDatum> metricsData = new ArrayList<>();
@@ -234,7 +232,7 @@ public class CloudWatchReporter extends ScheduledReporter {
try {
cloudWatchFuture.get(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
- LOG.error("Error reporting metrics to CloudWatch. The data in this
CloudWatch request "
+ log.error("Error reporting metrics to CloudWatch. The data in this
CloudWatch request "
+ "may have been discarded, and not made it to CloudWatch.", ex);
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -316,7 +314,7 @@ public class CloudWatchReporter extends ScheduledReporter {
try {
cloudWatchClientAsync.close();
} catch (Exception ex) {
- LOG.warn("Exception while shutting down CloudWatch client.", ex);
+ log.warn("Exception while shutting down CloudWatch client.", ex);
}
}
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index dab55d6b4f1d..2dc45a255a63 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -38,9 +38,9 @@ import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.schema.MessageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.glue.GlueAsyncClient;
import software.amazon.awssdk.services.glue.GlueAsyncClientBuilder;
@@ -110,14 +110,14 @@ import static
org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.ALL_PARTITIONS_READ_PARALLELISM;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.CHANGED_PARTITIONS_READ_PARALLELISM;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_METADATA_FILE_LISTING;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_DATABASE_NAME;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_RESOURCE_TAGS;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_TABLE_NAME;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.META_SYNC_PARTITION_INDEX_FIELDS_ENABLE;
import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.PARTITION_CHANGE_PARALLELISM;
import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_ENDPOINT;
import static org.apache.hudi.config.HoodieAWSConfig.AWS_GLUE_REGION;
-import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_DATABASE_NAME;
-import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_RESOURCE_TAGS;
-import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.GLUE_SYNC_TABLE_NAME;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
@@ -132,9 +132,8 @@ import static
org.apache.hudi.sync.common.util.TableUtils.tableId;
*
* @Experimental
*/
+@Slf4j
public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_CHANGE_REQUEST = 100;
private static final int MAX_PARTITIONS_PER_READ_REQUEST = 1000;
private static final int MAX_DELETE_PARTITIONS_PER_REQUEST = 25;
@@ -148,7 +147,9 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
private static final String ENABLE_MDT_LISTING =
"hudi.metadata-listing-enabled";
private static final String GLUE_TABLE_ARN_FORMAT =
"arn:aws:glue:%s:%s:table/%s/%s";
private static final String GLUE_DATABASE_ARN_FORMAT =
"arn:aws:glue:%s:%s:database/%s";
+ @Getter
private final String databaseName;
+ @Getter
private final String tableName;
private final boolean skipTableArchive;
@@ -191,16 +192,6 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
- @Override
- public String getTableName() {
- return this.tableName;
- }
-
- @Override
- public String getDatabaseName() {
- return this.databaseName;
- }
-
private List<Partition> getPartitionsSegment(Segment segment, String
tableName) {
try {
List<Partition> partitions = new ArrayList<>();
@@ -259,7 +250,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
@Override
public List<Partition> getPartitionsFromList(String tableName, List<String>
partitionList) {
if (partitionList.isEmpty()) {
- LOG.info("No partitions to read for " + tableId(this.databaseName,
tableName));
+ log.info("No partitions to read for " + tableId(this.databaseName,
tableName));
return Collections.emptyList();
}
HoodieTimer timer = HoodieTimer.start();
@@ -278,7 +269,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
for (Future<List<Partition>> future : futures) {
partitions.addAll(future.get());
}
- LOG.info(
+ log.info(
"Requested {} partitions, found existing {} partitions, new {}
partitions",
partitionList.size(),
partitions.size(),
@@ -289,7 +280,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
throw new HoodieGlueSyncException("Failed to get all partitions for
table " + tableId(this.databaseName, tableName), e);
} finally {
executorService.shutdownNow();
- LOG.info("Took {} ms to get {} partitions for table {}",
timer.endTimer(), partitionList.size(), tableId(this.databaseName, tableName));
+ log.info("Took {} ms to get {} partitions for table {}",
timer.endTimer(), partitionList.size(), tableId(this.databaseName, tableName));
}
}
@@ -317,13 +308,13 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
HoodieTimer timer = HoodieTimer.start();
try {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableId(this.databaseName,
tableName));
+ log.info("No partitions to add for " + tableId(this.databaseName,
tableName));
return;
}
Table table = getTable(awsGlue, databaseName, tableName);
parallelizeChange(partitionsToAdd, this.changeParallelism, partitions ->
this.addPartitionsToTableInternal(table, partitions),
MAX_PARTITIONS_PER_CHANGE_REQUEST);
} finally {
- LOG.info("Added {} partitions to table {} in {} ms",
partitionsToAdd.size(), tableId(this.databaseName, tableName),
timer.endTimer());
+ log.info("Added {} partitions to table {} in {} ms",
partitionsToAdd.size(), tableId(this.databaseName, tableName),
timer.endTimer());
}
}
@@ -364,7 +355,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
if (response.errors().stream()
.allMatch(
(error) ->
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
- LOG.info("Partitions already exist in glue: {}", response.errors());
+ log.info("Partitions already exist in glue: {}", response.errors());
} else {
throw new HoodieGlueSyncException("Fail to add partitions to " +
tableId(databaseName, table.name())
+ " with error(s): " + response.errors());
@@ -380,13 +371,13 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
HoodieTimer timer = HoodieTimer.start();
try {
if (changedPartitions.isEmpty()) {
- LOG.info("No partitions to update for " + tableId(this.databaseName,
tableName));
+ log.info("No partitions to update for " + tableId(this.databaseName,
tableName));
return;
}
Table table = getTable(awsGlue, databaseName, tableName);
parallelizeChange(changedPartitions, this.changeParallelism, partitions
-> this.updatePartitionsToTableInternal(table, partitions),
MAX_PARTITIONS_PER_CHANGE_REQUEST);
} finally {
- LOG.info("Updated {} partitions to table {} in {} ms",
changedPartitions.size(), tableId(this.databaseName, tableName),
timer.endTimer());
+ log.info("Updated {} partitions to table {} in {} ms",
changedPartitions.size(), tableId(this.databaseName, tableName),
timer.endTimer());
}
}
@@ -420,12 +411,12 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
HoodieTimer timer = HoodieTimer.start();
try {
if (partitionsToDrop.isEmpty()) {
- LOG.info("No partitions to drop for " + tableId(this.databaseName,
tableName));
+ log.info("No partitions to drop for " + tableId(this.databaseName,
tableName));
return;
}
parallelizeChange(partitionsToDrop, this.changeParallelism, partitions
-> this.dropPartitionsInternal(tableName, partitions),
MAX_DELETE_PARTITIONS_PER_REQUEST);
} finally {
- LOG.info("Deleted {} partitions to table {} in {} ms",
partitionsToDrop.size(), tableId(this.databaseName, tableName),
timer.endTimer());
+ log.info("Deleted {} partitions to table {} in {} ms",
partitionsToDrop.size(), tableId(this.databaseName, tableName),
timer.endTimer());
}
}
@@ -573,7 +564,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
// TODO: skip cascading when new fields in structs are added to the
schema in last position
boolean cascade =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0 &&
!schemaDiff.getUpdateColumnTypes().isEmpty();
if (cascade) {
- LOG.info("Cascading column changes to partitions");
+ log.info("Cascading column changes to partitions");
List<String> allPartitions = getAllPartitions(tableName).stream()
.map(partition -> getStringFromPartition(table.partitionKeys(),
partition.getValues()))
.collect(Collectors.toList());
@@ -695,9 +686,9 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
.build();
CreateTableResponse response = awsGlue.createTable(request).get();
- LOG.info("Created table {} : {}", tableId(databaseName, tableName),
response);
+ log.info("Created table {} : {}", tableId(databaseName, tableName),
response);
} catch (AlreadyExistsException e) {
- LOG.warn("Table {} already exists.", tableId(databaseName, tableName),
e);
+ log.warn("Table {} already exists.", tableId(databaseName, tableName),
e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to create " +
tableId(databaseName, tableName), e);
}
@@ -716,14 +707,14 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
if (!config.getBooleanOrDefault(META_SYNC_PARTITION_INDEX_FIELDS_ENABLE)) {
// deactivate indexing if enabled
if (getPartitionIndexEnable(tableName)) {
- LOG.info("Deactivating partition indexing");
+ log.info("Deactivating partition indexing");
updatePartitionIndexEnable(tableName, false);
}
// also drop all existing indexes
GetPartitionIndexesRequest indexesRequest =
GetPartitionIndexesRequest.builder().databaseName(databaseName).tableName(tableName).build();
GetPartitionIndexesResponse existingIdxsResp =
awsGlue.getPartitionIndexes(indexesRequest).get();
for (PartitionIndexDescriptor idsToDelete :
existingIdxsResp.partitionIndexDescriptorList()) {
- LOG.info("Dropping partition index: {}", idsToDelete.indexName());
+ log.info("Dropping partition index: {}", idsToDelete.indexName());
DeletePartitionIndexRequest idxToDelete =
DeletePartitionIndexRequest.builder()
.databaseName(databaseName).tableName(tableName).indexName(idsToDelete.indexName()).build();
awsGlue.deletePartitionIndex(idxToDelete).get();
@@ -731,7 +722,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
} else {
// activate indexing usage if disabled
if (!getPartitionIndexEnable(tableName)) {
- LOG.info("Activating partition indexing");
+ log.info("Activating partition indexing");
updatePartitionIndexEnable(tableName, true);
}
@@ -756,7 +747,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
indexesChanges = true;
DeletePartitionIndexRequest idxToDelete =
DeletePartitionIndexRequest.builder()
.databaseName(databaseName).tableName(tableName).indexName(existingIdx.indexName()).build();
- LOG.info("Dropping irrelevant index: {}", existingIdx.indexName());
+ log.info("Dropping irrelevant index: {}", existingIdx.indexName());
awsGlue.deletePartitionIndex(idxToDelete).get();
}
}
@@ -778,7 +769,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
PartitionIndex newIdx = PartitionIndex.builder()
.indexName(newIdxName)
.keys(neededIdx).build();
- LOG.info("Creating new partition index: {}", newIdxName);
+ log.info("Creating new partition index: {}", newIdxName);
CreatePartitionIndexRequest creationRequest =
CreatePartitionIndexRequest.builder()
.databaseName(databaseName).tableName(tableName).partitionIndex(newIdx).build();
awsGlue.createPartitionIndex(creationRequest).get();
@@ -794,7 +785,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
.map(idx ->
Arrays.stream(idx.split(";"))
.collect(Collectors.toList())).collect(Collectors.toList());
if (indexes.size() > PARTITION_INDEX_MAX_NUMBER) {
- LOG.warn("Only considering first {} indexes",
PARTITION_INDEX_MAX_NUMBER);
+ log.warn("Only considering first {} indexes",
PARTITION_INDEX_MAX_NUMBER);
return indexes.subList(0, PARTITION_INDEX_MAX_NUMBER);
}
return indexes;
@@ -853,7 +844,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
return Objects.nonNull(table);
} catch (ExecutionException e) {
if (e.getCause() instanceof EntityNotFoundException) {
- LOG.info("Table not found: {}.{}", databaseName, tableName, e);
+ log.info("Table not found: {}.{}", databaseName, tableName, e);
return false;
} else {
throw new HoodieGlueSyncException("Fail to get table: " +
tableId(databaseName, tableName), e);
@@ -873,7 +864,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
return Objects.nonNull(awsGlue.getDatabase(request).get().database());
} catch (ExecutionException e) {
if (e.getCause() instanceof EntityNotFoundException) {
- LOG.info("Database not found: {}", databaseName, e);
+ log.info("Database not found: {}", databaseName, e);
return false;
} else {
throw new HoodieGlueSyncException("Fail to check if database exists "
+ databaseName, e);
@@ -900,9 +891,9 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
try {
CreateDatabaseResponse result = awsGlue.createDatabase(request).get();
tagResource(String.format(GLUE_DATABASE_ARN_FORMAT,
awsGlue.serviceClientConfiguration().region(), catalogId, databaseName));
- LOG.info("Successfully created database in AWS Glue: {}",
result.toString());
+ log.info("Successfully created database in AWS Glue: {}",
result.toString());
} catch (AlreadyExistsException e) {
- LOG.info("AWS Glue Database {} already exists", databaseName, e);
+ log.info("AWS Glue Database {} already exists", databaseName, e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to create database " +
databaseName, e);
}
@@ -950,7 +941,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
throw new HoodieGlueSyncException("Fail to update last sync commit
time for " + tableId(databaseName, tableName), e);
}
} else {
- LOG.info("No commit in active timeline.");
+ log.info("No commit in active timeline.");
}
try {
// as a side effect, we also refresh the partition indexes if needed
@@ -958,9 +949,9 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
// therefore we call this at each commit as a workaround
managePartitionIndexes(tableName);
} catch (ExecutionException e) {
- LOG.warn("An indexation process is currently running.", e);
+ log.warn("An indexation process is currently running.", e);
} catch (Exception e) {
- LOG.warn("Something went wrong with partition index", e);
+ log.warn("Something went wrong with partition index", e);
}
}
@@ -974,7 +965,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
try {
awsGlue.deleteTable(deleteTableRequest).get();
- LOG.info("Successfully deleted table in AWS Glue: {}.{}", databaseName,
tableName);
+ log.info("Successfully deleted table in AWS Glue: {}.{}", databaseName,
tableName);
} catch (Exception e) {
if (e instanceof InterruptedException) {
// In case {@code InterruptedException} was thrown, resetting the
interrupted flag
@@ -1040,7 +1031,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
boolean different = serdeProperties.entrySet().stream().anyMatch(e ->
!existingSerdeProperties.containsKey(e.getKey()) ||
!existingSerdeProperties.get(e.getKey()).equals(e.getValue()));
if (!different) {
- LOG.debug("Table {} serdeProperties already up to date, skip update
serde properties.", tableName);
+ log.debug("Table {} serdeProperties already up to date, skip update
serde properties.", tableName);
return false;
}
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
index 66c1f462afc0..c956b5371c77 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
@@ -33,6 +33,7 @@ import
com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
import com.amazonaws.services.dynamodbv2.LockItem;
import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
@@ -75,6 +76,7 @@ public abstract class DynamoDBBasedLockProviderBase
implements LockProvider<Lock
protected final AmazonDynamoDBLockClient client;
protected final String tableName;
protected final String dynamoDBPartitionKey;
+ @Getter
protected volatile LockItem lock;
protected DynamoDBBasedLockProviderBase(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB)
{
@@ -156,11 +158,6 @@ public abstract class DynamoDBBasedLockProviderBase
implements LockProvider<Lock
}
}
- @Override
- public LockItem getLock() {
- return lock;
- }
-
private static DynamoDbClient getDynamoDBClient(DynamoDbBasedLockConfig
dynamoDbBasedLockConfig) {
String region = dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_REGION);
String endpointURL =
dynamoDbBasedLockConfig.contains(DYNAMODB_ENDPOINT_URL.key())
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
index c6034ca80783..4a6fa605581d 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
@@ -32,8 +32,8 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
@@ -63,10 +63,9 @@ import static
org.apache.hudi.config.StorageBasedLockConfig.VALIDITY_TIMEOUT_SEC
* S3-based distributed lock client using ETag checks (AWS SDK v2).
* See RFC: <a
href="https://github.com/apache/hudi/blob/master/rfc/rfc-91/rfc-91.md">RFC-91</a>
*/
+@Slf4j
@ThreadSafe
public class S3StorageLockClient implements StorageLockClient {
-
- private static final Logger LOG =
LoggerFactory.getLogger(S3StorageLockClient.class);
private static final int PRECONDITION_FAILURE_ERROR_CODE = 412;
private static final int NOT_FOUND_ERROR_CODE = 404;
private static final int CONDITIONAL_REQUEST_CONFLICT_ERROR_CODE = 409;
@@ -87,7 +86,7 @@ public class S3StorageLockClient implements StorageLockClient
{
* @param props The properties for the lock config, can be used to
customize client.
*/
public S3StorageLockClient(String ownerId, String lockFileUri, Properties
props) {
- this(ownerId, lockFileUri, props, createDefaultS3Client(), LOG);
+ this(ownerId, lockFileUri, props, createDefaultS3Client(), log);
}
@VisibleForTesting
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java
index b507ca2bf478..ace74a1fbc79 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java
@@ -18,8 +18,7 @@
package org.apache.hudi.aws.utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
@@ -45,14 +44,11 @@ import
software.amazon.awssdk.services.dynamodb.model.TableStatus;
* // ... start making calls to table ...
* </pre>
*/
+@Slf4j
public class DynamoTableUtils {
private static final int DEFAULT_WAIT_TIMEOUT = 20 * 60 * 1000;
private static final int DEFAULT_WAIT_INTERVAL = 10 * 1000;
- /**
- * The logging utility.
- */
- private static final Logger LOGGER =
LoggerFactory.getLogger(DynamoTableUtils.class);
/**
* Waits up to 10 minutes for a specified DynamoDB table to resolve,
@@ -225,8 +221,8 @@ public class DynamoTableUtils {
dynamo.createTable(createTableRequest);
return true;
} catch (final ResourceInUseException e) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Table " + createTableRequest.tableName() + " already
exists", e);
+ if (log.isTraceEnabled()) {
+ log.trace("Table " + createTableRequest.tableName() + " already
exists", e);
}
}
return false;
@@ -243,8 +239,8 @@ public class DynamoTableUtils {
dynamo.deleteTable(deleteTableRequest);
return true;
} catch (final ResourceNotFoundException e) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Table " + deleteTableRequest.tableName() + " does not
exist", e);
+ if (log.isTraceEnabled()) {
+ log.trace("Table " + deleteTableRequest.tableName() + " does not
exist", e);
}
}
return false;
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java
index b4bb290e25c6..ecc1383ad1d3 100644
---
a/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/sync/ITTestGluePartitionPushdown.java
@@ -48,7 +48,6 @@ import software.amazon.awssdk.services.glue.model.SerDeInfo;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.TableInput;
-import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Arrays;
@@ -65,19 +64,18 @@ public class ITTestGluePartitionPushdown {
private static final String MOTO_ENDPOINT = "http://localhost:" + MOTO_PORT;
private static final String DB_NAME = "db_name";
private static final String TABLE_NAME = "tbl_name";
- private String basePath = Files.createTempDirectory("hivesynctest" +
Instant.now().toEpochMilli()).toUri().toString();
- private String tablePath = basePath + "/" + TABLE_NAME;
+ private String basePath;
+ private String tablePath;
private TypedProperties hiveSyncProps;
private AWSGlueCatalogSyncClient glueSync;
private FileSystem fileSystem;
private Column[] partitionsColumn =
{Column.builder().name("part1").type("int").build(),
Column.builder().name("part2").type("string").build()};
List<FieldSchema> partitionsFieldSchema = Arrays.asList(new
FieldSchema("part1", "int"), new FieldSchema("part2", "string"));
- public ITTestGluePartitionPushdown() throws IOException {
- }
-
@BeforeEach
public void setUp() throws Exception {
+ basePath = Files.createTempDirectory("hivesynctest" +
Instant.now().toEpochMilli()).toUri().toString();
+ tablePath = basePath + "/" + TABLE_NAME;
hiveSyncProps = new TypedProperties();
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(), "dummy");
hiveSyncProps.setProperty(HoodieAWSConfig.AWS_SECRET_KEY.key(), "dummy");