This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 496d07e62702 refactor: Add Lombok annotations to hudi-sync modules
(#17728)
496d07e62702 is described below
commit 496d07e6270256471c9f6a5da19649978d5ef037
Author: voonhous <[email protected]>
AuthorDate: Sat Jan 10 14:54:57 2026 +0800
refactor: Add Lombok annotations to hudi-sync modules (#17728)
---
hudi-sync/hudi-adb-sync/pom.xml | 6 ++
.../java/org/apache/hudi/sync/adb/AdbSyncTool.java | 31 +++++----
.../apache/hudi/sync/adb/HoodieAdbJdbcClient.java | 56 ++++++++--------
hudi-sync/hudi-datahub-sync/pom.xml | 6 ++
.../hudi/sync/datahub/DataHubResponseLogger.java | 13 ++--
.../hudi/sync/datahub/DataHubSyncClient.java | 31 ++++-----
.../apache/hudi/sync/datahub/DataHubSyncTool.java | 17 +++--
.../hudi/sync/datahub/DataHubTableProperties.java | 22 +++----
.../sync/datahub/config/DataHubSyncConfig.java | 8 +--
.../config/HoodieDataHubDatasetIdentifier.java | 38 ++---------
.../config/TlsEnabledDataHubEmitterSupplier.java | 32 ++++-----
hudi-sync/hudi-hive-sync/pom.xml | 6 ++
.../java/org/apache/hudi/hive/HiveSyncTool.java | 77 ++++++++++------------
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 17 +++--
.../org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 40 ++++++-----
.../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 20 +++---
.../org/apache/hudi/hive/ddl/JDBCExecutor.java | 22 +++----
.../hudi/hive/ddl/QueryBasedDDLExecutor.java | 18 +++--
.../hudi/hive/replication/GlobalHiveSyncTool.java | 13 ++--
.../replication/HiveSyncGlobalCommitParams.java | 9 +--
.../hive/replication/HiveSyncGlobalCommitTool.java | 17 +++--
.../hive/replication/ReplicationStateSync.java | 6 +-
.../lock/HiveMetastoreBasedLockProvider.java | 26 +++-----
.../apache/hudi/hive/util/HivePartitionUtil.java | 7 +-
.../org/apache/hudi/hive/util/HiveSchemaUtil.java | 11 ++--
.../hudi/hive/testutils/HiveTestService.java | 33 ++++------
.../apache/hudi/hive/testutils/HiveTestUtil.java | 22 +++----
hudi-sync/hudi-sync-common/pom.xml | 6 ++
.../org/apache/hudi/hive/SchemaDifference.java | 17 ++---
.../apache/hudi/sync/common/HoodieSyncClient.java | 18 ++---
.../apache/hudi/sync/common/HoodieSyncConfig.java | 32 +++------
.../sync/common/metrics/HoodieMetaSyncMetrics.java | 13 ++--
.../apache/hudi/sync/common/model/FieldSchema.java | 37 ++---------
.../apache/hudi/sync/common/model/Partition.java | 18 ++---
.../hudi/sync/common/util/ManifestFileWriter.java | 21 +++---
.../hudi/sync/common/util/SyncUtilHelpers.java | 7 +-
36 files changed, 318 insertions(+), 455 deletions(-)
diff --git a/hudi-sync/hudi-adb-sync/pom.xml b/hudi-sync/hudi-adb-sync/pom.xml
index a7b1167a4144..a050df64e61e 100644
--- a/hudi-sync/hudi-adb-sync/pom.xml
+++ b/hudi-sync/hudi-adb-sync/pom.xml
@@ -110,6 +110,12 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
diff --git
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
index 5357af384f0e..81b5e39d4ff1 100644
---
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
+++
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -33,11 +33,10 @@ import
org.apache.hudi.sync.common.model.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import com.beust.jcommander.JCommander;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -70,9 +69,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
* This utility will get the schema from the latest commit and will sync ADB
table schema,
* incremental partitions will be synced as well.
*/
+@Slf4j
@SuppressWarnings("WeakerAccess")
public class AdbSyncTool extends HoodieSyncTool {
- private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
@@ -144,7 +143,7 @@ public class AdbSyncTool extends HoodieSyncTool {
}
private void syncHoodieTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) throws Exception {
- LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}",
+ log.info("Try to sync hoodie table, tableName: {}, path: {}, tableType:
{}",
tableName, syncClient.getBasePath(), syncClient.getTableType());
if (config.getBoolean(ADB_SYNC_AUTO_CREATE_DATABASE)) {
@@ -163,7 +162,7 @@ public class AdbSyncTool extends HoodieSyncTool {
}
if (config.getBoolean(ADB_SYNC_DROP_TABLE_BEFORE_CREATION)) {
- LOG.info("Drop table before creation, tableName:{}", tableName);
+ log.info("Drop table before creation, tableName: {}", tableName);
syncClient.dropTable(tableName);
}
@@ -174,14 +173,14 @@ public class AdbSyncTool extends HoodieSyncTool {
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat,
readAsOptimized, schema);
- LOG.info("Sync schema complete, start syncing partitions for table:{}",
tableName);
+ log.info("Sync schema complete, start syncing partitions for table: {}",
tableName);
// Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty();
if (tableExists) {
lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
}
- LOG.info("Last commit time synced was found:{}",
lastCommitTimeSynced.orElse("null"));
+ log.info("Last commit time synced was found: {}",
lastCommitTimeSynced.orElse("null"));
// Scan synced partitions
List<String> writtenPartitionsSince;
@@ -190,7 +189,7 @@ public class AdbSyncTool extends HoodieSyncTool {
} else {
writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced, Option.empty());
}
- LOG.info("Scan partitions complete, partitionNum:{}",
writtenPartitionsSince.size());
+ log.info("Scan partitions complete, partitionNum: {}",
writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
@@ -200,7 +199,7 @@ public class AdbSyncTool extends HoodieSyncTool {
if (!config.getBoolean(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC)) {
syncClient.updateLastCommitTimeSynced(tableName);
}
- LOG.info("Sync complete for table:{}", tableName);
+ log.info("Sync complete for table: {}", tableName);
}
/**
@@ -224,13 +223,13 @@ public class AdbSyncTool extends HoodieSyncTool {
Map<String, String> sparkSerdeProperties =
SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized,
config.getString(META_SYNC_BASE_PATH));
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
- LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{},
tableProperties:{}, sederProperties:{}",
+ log.info("Sync as spark datasource table, tableName: {}, tableExists:
{}, tableProperties: {}, sederProperties: {}",
tableName, tableExists, tableProperties, serdeProperties);
}
// Check and sync schema
if (!tableExists) {
- LOG.info("ADB table [{}] is not found, creating it", tableName);
+ log.info("ADB table [{}] is not found, creating it", tableName);
String inputFormatClassName =
HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET,
useRealTimeInputFormat);
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
@@ -244,10 +243,10 @@ public class AdbSyncTool extends HoodieSyncTool {
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema,
tableSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
config.getBoolean(ADB_SYNC_SUPPORT_TIMESTAMP));
if (!schemaDiff.isEmpty()) {
- LOG.info("Schema difference found for table:{}", tableName);
+ log.info("Schema difference found for table: {}", tableName);
syncClient.updateTableDefinition(tableName, schemaDiff);
} else {
- LOG.info("No Schema difference for table:{}", tableName);
+ log.info("No Schema difference for table: {}", tableName);
}
}
}
@@ -259,17 +258,17 @@ public class AdbSyncTool extends HoodieSyncTool {
private void syncPartitions(String tableName, List<String>
writtenPartitionsSince) {
try {
if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
- LOG.info("Not a partitioned table.");
+ log.info("Not a partitioned table.");
return;
}
Map<List<String>, String> partitions =
syncClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
syncClient.getPartitionEvents(partitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents,
PartitionEventType.ADD);
- LOG.info("New Partitions:{}", newPartitions);
+ log.info("New Partitions: {}", newPartitions);
syncClient.addPartitionsToTable(tableName, newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents,
PartitionEventType.UPDATE);
- LOG.info("Changed Partitions:{}", updatePartitions);
+ log.info("Changed Partitions: {}", updatePartitions);
syncClient.updatePartitionsToTable(tableName, updatePartitions);
} catch (Exception e) {
throw new HoodieAdbSyncException("Failed to sync partitions for table:"
+ tableName, e);
diff --git
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
index 202e9b482de5..2c7ea386ced5 100644
---
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
+++
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
@@ -31,9 +31,8 @@ import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.PartitionEvent;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -56,10 +55,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+@Slf4j
public class HoodieAdbJdbcClient extends HoodieSyncClient {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieAdbJdbcClient.class);
-
public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
// Make sure we have the jdbc driver in classpath
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
@@ -82,7 +80,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
this.config = config;
this.databaseName = config.getString(META_SYNC_DATABASE_NAME);
createAdbConnection();
- LOG.info("Init adb jdbc client success, jdbcUrl:{}",
config.getString(ADB_SYNC_JDBC_URL));
+ log.info("Init adb jdbc client success, jdbcUrl: {}",
config.getString(ADB_SYNC_JDBC_URL));
}
private void createAdbConnection() {
@@ -90,7 +88,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
- LOG.error("Unable to load jdbc driver class", e);
+ log.error("Unable to load jdbc driver class", e);
return;
}
try {
@@ -109,18 +107,18 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient
{
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String,
String> tableProperties) {
try {
- LOG.info("Creating table:{}", tableName);
+ log.info("Creating table: {}", tableName);
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName,
storageSchema,
config, inputFormatClass, outputFormatClass, serdeClass,
serdeProperties, tableProperties);
executeAdbSql(createSQLQuery);
} catch (IOException e) {
- throw new HoodieException("Fail to create table:" + tableName, e);
+ throw new HoodieException("Fail to create table: " + tableName, e);
}
}
@Override
public void dropTable(String tableName) {
- LOG.info("Dropping table:{}", tableName);
+ log.info("Dropping table: {}", tableName);
String dropTable = "drop table if exists `" + databaseName + "`.`" +
tableName + "`";
executeAdbSql(dropTable);
}
@@ -145,7 +143,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
}
return schema;
} catch (SQLException e) {
- throw new HoodieException("Fail to get table schema:" + tableName, e);
+ throw new HoodieException("Fail to get table schema: " + tableName, e);
} finally {
closeQuietly(result, null);
}
@@ -154,11 +152,11 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient
{
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for table:{}", tableName);
+ log.info("No partitions to add for table: {}", tableName);
return;
}
- LOG.info("Adding partitions to table:{}, partitionNum:{}", tableName,
partitionsToAdd.size());
+ log.info("Adding partitions to table: {}, partitionNum: {}", tableName,
partitionsToAdd.size());
String sql = constructAddPartitionsSql(tableName, partitionsToAdd);
executeAdbSql(sql);
}
@@ -167,10 +165,10 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient
{
Statement stmt = null;
try {
stmt = connection.createStatement();
- LOG.info("Executing sql:{}", sql);
+ log.info("Executing sql: {}", sql);
stmt.execute(sql);
} catch (SQLException e) {
- throw new HoodieException("Fail to execute sql:" + sql, e);
+ throw new HoodieException("Fail to execute sql: " + sql, e);
} finally {
closeQuietly(null, stmt);
}
@@ -180,10 +178,10 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient
{
Statement stmt = null;
try {
stmt = connection.createStatement();
- LOG.info("Executing sql:{}", sql);
+ log.info("Executing sql: {}", sql);
return function.apply(stmt.executeQuery(sql));
} catch (SQLException e) {
- throw new HoodieException("Fail to execute sql:" + sql, e);
+ throw new HoodieException("Fail to execute sql: " + sql, e);
} finally {
closeQuietly(null, stmt);
}
@@ -191,7 +189,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
public void createDatabase(String databaseName) {
String rootPath = config.getDatabasePath();
- LOG.info("Creating database:{}, databaseLocation:{}", databaseName,
rootPath);
+ log.info("Creating database: {}, databaseLocation: {}", databaseName,
rootPath);
String sql = constructCreateDatabaseSql(rootPath);
executeAdbSql(sql);
}
@@ -205,7 +203,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
if (e.getMessage().contains("Unknown database `" + databaseName +
"`")) {
return false;
} else {
- throw new HoodieException("Fail to execute sql:" + sql, e);
+ throw new HoodieException("Fail to execute sql: " + sql, e);
}
}
};
@@ -219,7 +217,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
try {
return resultSet.next();
} catch (Exception e) {
- throw new HoodieException("Fail to execute sql:" + sql, e);
+ throw new HoodieException("Fail to execute sql: " + sql, e);
}
};
return executeQuerySQL(sql, transform);
@@ -253,7 +251,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
}
return Option.empty();
} catch (Exception e) {
- throw new HoodieException("Fail to execute sql:" + sql, e);
+ throw new HoodieException("Fail to execute sql: " + sql, e);
}
};
return executeQuerySQL(sql, transform);
@@ -267,7 +265,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
String sql = constructUpdateTblPropertiesSql(tableName,
lastCommitSynced);
executeAdbSql(sql);
} catch (Exception e) {
- throw new HoodieHiveSyncException("Fail to get update last commit time
synced:" + lastCommitSynced, e);
+ throw new HoodieHiveSyncException("Fail to get update last commit time
synced: " + lastCommitSynced, e);
}
}
@@ -294,11 +292,11 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient
{
@Override
public void updatePartitionsToTable(String tableName, List<String>
changedPartitions) {
if (changedPartitions.isEmpty()) {
- LOG.info("No partitions to change for table:{}", tableName);
+ log.info("No partitions to change for table: {}", tableName);
return;
}
- LOG.info("Changing partitions on table:{}, changedPartitionNum:{}",
tableName, changedPartitions.size());
+ log.info("Changing partitions on table: {}, changedPartitionNum: {}",
tableName, changedPartitions.size());
List<String> sqlList = constructChangePartitionsSql(tableName,
changedPartitions);
for (String sql : sqlList) {
executeAdbSql(sql);
@@ -332,7 +330,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
}
}
} catch (Exception e) {
- throw new HoodieException("Fail to execute sql:" + sql, e);
+ throw new HoodieException("Fail to execute sql: " + sql, e);
}
return partitions;
};
@@ -343,12 +341,12 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient
{
* TODO align with {@link
org.apache.hudi.sync.common.HoodieMetaSyncOperations#updateTableSchema}
*/
public void updateTableDefinition(String tableName, SchemaDifference
schemaDiff) {
- LOG.info("Adding columns for table:{}", tableName);
+ log.info("Adding columns for table: {}", tableName);
schemaDiff.getAddColumnTypes().forEach((columnName, columnType) ->
executeAdbSql(constructAddColumnSql(tableName, columnName, columnType))
);
- LOG.info("Updating columns' definition for table:{}", tableName);
+ log.info("Updating columns' definition for table: {}", tableName);
schemaDiff.getUpdateColumnTypes().forEach((columnName, columnType) ->
executeAdbSql(constructChangeColumnSql(tableName, columnName,
columnType))
);
@@ -483,7 +481,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
stmt.close();
}
} catch (SQLException e) {
- LOG.warn("Could not close the statement opened ", e);
+ log.warn("Could not close the statement opened ", e);
}
try {
@@ -491,7 +489,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
resultSet.close();
}
} catch (SQLException e) {
- LOG.warn("Could not close the resultset opened ", e);
+ log.warn("Could not close the resultset opened ", e);
}
}
@@ -502,7 +500,7 @@ public class HoodieAdbJdbcClient extends HoodieSyncClient {
connection.close();
}
} catch (SQLException e) {
- LOG.error("Fail to close connection", e);
+ log.error("Fail to close connection", e);
}
}
}
diff --git a/hudi-sync/hudi-datahub-sync/pom.xml
b/hudi-sync/hudi-datahub-sync/pom.xml
index 2063710dd115..9d485a57227f 100644
--- a/hudi-sync/hudi-datahub-sync/pom.xml
+++ b/hudi-sync/hudi-datahub-sync/pom.xml
@@ -78,6 +78,12 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
index afff4f5ddf60..da9c8dbd460f 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
@@ -21,27 +21,26 @@ package org.apache.hudi.sync.datahub;
import datahub.client.Callback;
import datahub.client.MetadataWriteResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
/**
* Handle responses to requests to Datahub Metastore. Just logs them.
*/
+@Slf4j
public class DataHubResponseLogger implements Callback {
- private static final Logger LOG =
LoggerFactory.getLogger(DataHubResponseLogger.class);
@Override
public void onCompletion(MetadataWriteResponse response) {
- LOG.info("Completed DataHub RestEmitter request. Status: {}",
(response.isSuccess() ? " succeeded" : " failed"));
+ log.info("Completed DataHub RestEmitter request. Status: {}",
(response.isSuccess() ? " succeeded" : " failed"));
if (!response.isSuccess()) {
- LOG.error("Request failed. {}", response);
+ log.error("Request failed. {}", response);
}
- LOG.debug("Response details: {}", response);
+ log.debug("Response details: {}", response);
}
@Override
public void onFailure(Throwable e) {
- LOG.error("Error during Datahub RestEmitter request", e);
+ log.error("Error during Datahub RestEmitter request", e);
}
}
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 10cdd16a768c..14671e30bfc1 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -52,8 +52,8 @@ import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -69,17 +69,18 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Slf4j
public class DataHubSyncClient extends HoodieSyncClient {
- private static final Logger LOG =
LoggerFactory.getLogger(DataHubSyncClient.class);
-
protected final DataHubSyncConfig config;
private final DataPlatformUrn dataPlatformUrn;
private final Option<String> dataPlatformInstance;
private final Option<Urn> dataPlatformInstanceUrn;
private final DatasetUrn datasetUrn;
private final Urn databaseUrn;
+ @Getter
private final String tableName;
+ @Getter
private final String databaseName;
private static final Status SOFT_DELETE_FALSE = new
Status().setRemoved(false);
@@ -97,16 +98,6 @@ public class DataHubSyncClient extends HoodieSyncClient {
this.databaseName = datasetIdentifier.getDatabaseName();
}
- @Override
- public String getDatabaseName() {
- return this.databaseName;
- }
-
- @Override
- public String getTableName() {
- return this.tableName;
- }
-
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
throw new UnsupportedOperationException("Not supported:
`getLastCommitTimeSynced`");
@@ -126,14 +117,14 @@ public class DataHubSyncClient extends HoodieSyncClient {
if (lastCommitTime.isPresent()) {
updateTableProperties(tableName,
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTime.get()));
} else {
- LOG.error("Failed to get last commit time");
+ log.error("Failed to get last commit time");
}
Option<String> lastCommitCompletionTime = getLastCommitCompletionTime();
if (lastCommitCompletionTime.isPresent()) {
updateTableProperties(tableName,
Collections.singletonMap(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
lastCommitCompletionTime.get()));
} else {
- LOG.error("Failed to get last commit completion time");
+ log.error("Failed to get last commit completion time");
}
}
@@ -163,7 +154,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
throw new HoodieDataHubSyncException(
"Failed to sync properties for Dataset " + datasetUrn + ": " +
tableProperties, e);
} else {
- LOG.error("Failed to sync properties for Dataset {}: {}", datasetUrn,
tableProperties, e);
+ log.error("Failed to sync properties for Dataset {}: {}", datasetUrn,
tableProperties, e);
return false;
}
}
@@ -206,7 +197,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
throw new HoodieDataHubSyncException("Failed to sync " +
failures.size() + " operations", failures.get(0));
} else {
for (Throwable failure : failures) {
- LOG.error("Failed to sync operation", failure);
+ log.error("Failed to sync operation", failure);
}
}
}
@@ -214,7 +205,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
if (!config.suppressExceptions()) {
throw new HoodieDataHubSyncException(String.format("Failed to sync
metadata for dataset %s", tableName), e);
} else {
- LOG.error("Failed to sync metadata for dataset {}", tableName, e);
+ log.error("Failed to sync metadata for dataset {}", tableName, e);
}
}
}
@@ -266,7 +257,7 @@ public class DataHubSyncClient extends HoodieSyncClient {
.build();
return attachDomainProposal;
} catch (URISyntaxException e) {
- LOG.warn("Failed to create domain URN from string: {}",
config.getDomainIdentifier());
+ log.warn("Failed to create domain URN from string: {}",
config.getDomainIdentifier());
}
return null;
}
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 951490c23047..8a89773e9de6 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -27,9 +27,8 @@ import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
import com.beust.jcommander.JCommander;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
@@ -45,8 +44,8 @@ import static
org.apache.hudi.sync.datahub.DataHubTableProperties.getTableProper
* @Experimental
* @see <a href="https://datahubproject.io/">https://datahubproject.io/</a>
*/
+@Slf4j
public class DataHubSyncTool extends HoodieSyncTool {
- private static final Logger LOG =
LoggerFactory.getLogger(DataHubSyncTool.class);
protected final DataHubSyncConfig config;
protected final HoodieTableMetaClient metaClient;
@@ -68,14 +67,14 @@ public class DataHubSyncTool extends HoodieSyncTool {
@Override
public void syncHoodieTable() {
try {
- LOG.info("Syncing target Hoodie table with DataHub dataset({}). DataHub
URL: {}, basePath: {}",
+ log.info("Syncing target Hoodie table with DataHub dataset({}). DataHub
URL: {}, basePath: {}",
tableName, config.getDataHubServerEndpoint(),
config.getString(META_SYNC_BASE_PATH));
syncSchema();
syncTableProperties();
updateLastCommitTimeIfNeeded();
- LOG.info("Sync completed for table {}", tableName);
+ log.info("Sync completed for table {}", tableName);
} catch (Exception e) {
throw new RuntimeException("Failed to sync table " + tableName + " to
DataHub", e);
} finally {
@@ -85,7 +84,7 @@ public class DataHubSyncTool extends HoodieSyncTool {
private void syncSchema() {
syncClient.updateTableSchema(tableName, null, null);
- LOG.info("Schema synced for table {}", tableName);
+ log.info("Schema synced for table {}", tableName);
}
private void syncTableProperties() {
@@ -93,14 +92,14 @@ public class DataHubSyncTool extends HoodieSyncTool {
HoodieTableMetadata tableMetadata = new HoodieTableMetadata(metaClient,
storageSchema);
Map<String, String> tableProperties = getTableProperties(config,
tableMetadata);
syncClient.updateTableProperties(tableName, tableProperties);
- LOG.info("Properties synced for table {}", tableName);
+ log.info("Properties synced for table {}", tableName);
}
private void updateLastCommitTimeIfNeeded() {
boolean shouldUpdateLastCommitTime =
!config.getBoolean(META_SYNC_CONDITIONAL_SYNC);
if (shouldUpdateLastCommitTime) {
syncClient.updateLastCommitTimeSynced(tableName);
- LOG.info("Updated last sync time for table {}", tableName);
+ log.info("Updated last sync time for table {}", tableName);
}
}
@@ -111,7 +110,7 @@ public class DataHubSyncTool extends HoodieSyncTool {
syncClient.close();
syncClient = null;
} catch (Exception e) {
- LOG.error("Error closing DataHub sync client", e);
+ log.error("Error closing DataHub sync client", e);
}
}
}
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
index 78f8ec43984e..849ea576d8d3 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
@@ -27,8 +27,9 @@ import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
@@ -43,10 +44,9 @@ import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BA
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_PARTITION_FIELDS;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_SPARK_VERSION;
+@Slf4j
public class DataHubTableProperties {
- private static final Logger LOG =
LoggerFactory.getLogger(DataHubTableProperties.class);
-
public static final String HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES =
"hoodie.meta.sync.datahub.table.properties";
public static final String HUDI_TABLE_TYPE = "hudi.table.type";
public static final String HUDI_TABLE_VERSION = "hudi.table.version";
@@ -107,19 +107,17 @@ public class DataHubTableProperties {
serdeProperties.put("serdeClass", serDeFormatClassName);
Map<String, String> sparkSerdeProperties =
SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized,
config.getString(META_SYNC_BASE_PATH));
sparkSerdeProperties.forEach((k, v) ->
serdeProperties.putIfAbsent(k.startsWith("spark.") ? k : "spark." + k, v));
- LOG.info("Serde Properties : {}", serdeProperties);
+ log.info("Serde Properties: {}", serdeProperties);
return serdeProperties;
}
+ @AllArgsConstructor
public static class HoodieTableMetadata {
+
private final HoodieTableMetaClient metaClient;
+ @Getter
private final HoodieSchema schema;
- public HoodieTableMetadata(HoodieTableMetaClient metaClient, HoodieSchema
schema) {
- this.metaClient = metaClient;
- this.schema = schema;
- }
-
public String getTableType() {
return metaClient.getTableType().name();
}
@@ -127,9 +125,5 @@ public class DataHubTableProperties {
public String getTableVersion() {
return metaClient.getTableConfig().getTableVersion().toString();
}
-
- public HoodieSchema getSchema() {
- return schema;
- }
}
}
\ No newline at end of file
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index b235bb5681eb..1b79ec151286 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -30,8 +30,7 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import datahub.client.rest.RestEmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.concurrent.Immutable;
@@ -44,13 +43,12 @@ import static
org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier
import static
org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier.DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME;
@Immutable
+@Slf4j
@ConfigClassProperty(name = "DataHub Sync Configs",
groupName = ConfigGroups.Names.META_SYNC,
description = "Configurations used by the Hudi to sync metadata to
DataHub.")
public class DataHubSyncConfig extends HoodieSyncConfig {
- private static final Logger LOG =
LoggerFactory.getLogger(DataHubSyncConfig.class);
-
public static final ConfigProperty<String>
META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS = ConfigProperty
.key("hoodie.meta.sync.datahub.dataset.identifier.class")
.defaultValue(HoodieDataHubDatasetIdentifier.class.getName())
@@ -181,7 +179,7 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
super(props);
// Log warning if the domain identifier is provided but is not in urn form
if (contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER) &&
!getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER).startsWith("urn:li:domain:")) {
- LOG.warn(
+ log.warn(
"Domain identifier must be in urn form (e.g.,
urn:li:domain:_domain_id). Provided {}. Will remove this from configuration.",
getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER));
this.props.remove(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key());
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index b7304491a848..ab27b5936224 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -26,11 +26,13 @@ import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import io.datahubproject.models.util.DatabaseKey;
+import lombok.AccessLevel;
+import lombok.Getter;
import java.util.Properties;
-import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATABASE_NAME;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_TABLE_NAME;
@@ -40,11 +42,13 @@ import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DA
* <p>
* Extend this to customize the way of constructing {@link DatasetUrn}.
*/
+@Getter
public class HoodieDataHubDatasetIdentifier {
public static final String DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME = "hudi";
public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV;
+ @Getter(AccessLevel.NONE)
protected final Properties props;
private final String dataPlatform;
private final DataPlatformUrn dataPlatformUrn;
@@ -87,38 +91,6 @@ public class HoodieDataHubDatasetIdentifier {
this.databaseUrn = databaseKey.asUrn();
}
- public DatasetUrn getDatasetUrn() {
- return this.datasetUrn;
- }
-
- public String getDataPlatform() {
- return this.dataPlatform;
- }
-
- public DataPlatformUrn getDataPlatformUrn() {
- return this.dataPlatformUrn;
- }
-
- public Option<String> getDataPlatformInstance() {
- return this.dataPlatformInstance;
- }
-
- public Option<Urn> getDataPlatformInstanceUrn() {
- return this.dataPlatformInstanceUrn;
- }
-
- public Urn getDatabaseUrn() {
- return this.databaseUrn;
- }
-
- public String getTableName() {
- return this.tableName;
- }
-
- public String getDatabaseName() {
- return this.databaseName;
- }
-
private static DataPlatformUrn createDataPlatformUrn(String platformUrn) {
return new DataPlatformUrn(platformUrn);
}
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
index 042182fd68c7..1746ea9f9e9b 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/TlsEnabledDataHubEmitterSupplier.java
@@ -19,18 +19,19 @@
package org.apache.hudi.sync.datahub.config;
-import datahub.client.rest.RestEmitter;
-import datahub.shaded.org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
-import
datahub.shaded.org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.sync.datahub.HoodieDataHubSyncException;
+
+import datahub.client.rest.RestEmitter;
+import
datahub.shaded.org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import datahub.shaded.org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -44,10 +45,9 @@ import java.util.Collection;
* This class reads TLS configuration from Hudi properties and creates a
RestEmitter
* with proper SSL/TLS context for secure communication with DataHub servers.
*/
+@Slf4j
public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier {
- private static final Logger LOG =
LoggerFactory.getLogger(TlsEnabledDataHubEmitterSupplier.class);
-
private final TypedProperties config;
public TlsEnabledDataHubEmitterSupplier(TypedProperties config) {
@@ -70,7 +70,7 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
String truststorePath = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PATH, true);
String truststorePassword = ConfigUtils.getStringWithAltKeys(config,
DataHubSyncConfig.META_SYNC_DATAHUB_TLS_TRUSTSTORE_PASSWORD, true);
- LOG.info("Creating DataHub RestEmitter with TLS configuration for
server: {}", serverUrl);
+ log.info("Creating DataHub RestEmitter with TLS configuration for
server: {}", serverUrl);
return RestEmitter.create(builder -> {
builder.server(serverUrl);
@@ -81,7 +81,7 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
// Configure TLS/SSL context if any TLS configuration is provided
if (hasTlsConfiguration(caCertPath, keystorePath, truststorePath)) {
- LOG.info("Configuring TLS for DataHub connection");
+ log.info("Configuring TLS for DataHub connection");
SSLContext sslContext = createSSLContext(caCertPath, keystorePath,
keystorePassword, truststorePath, truststorePassword);
builder.customizeHttpAsyncClient(httpClientBuilder -> {
@@ -94,7 +94,7 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
httpClientBuilder.setConnectionManager(connectionManagerBuilder.build());
});
- LOG.info("Successfully configured TLS for DataHub connection");
+ log.info("Successfully configured TLS for DataHub connection");
}
});
} catch (Exception e) {
@@ -119,9 +119,9 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
throw new HoodieDataHubSyncException("Keystore file not found: " +
keystorePath);
}
if (keystorePassword == null || keystorePassword.isEmpty()) {
- LOG.warn("No password provided for keystore {}. Using empty password
- consider using password-protected keystores for better security.",
keystorePath);
+ log.warn("No password provided for keystore {}. Using empty password
- consider using password-protected keystores for better security.",
keystorePath);
}
- LOG.info("Loading keystore from: {}", keystorePath);
+ log.info("Loading keystore from: {}", keystorePath);
KeyStore keyStore = KeyStore.getInstance("PKCS12");
char[] keystorePasswordChars = (keystorePassword != null &&
!keystorePassword.isEmpty())
? keystorePassword.toCharArray() : new char[0];
@@ -137,9 +137,9 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
throw new HoodieDataHubSyncException("Truststore file not found: " +
truststorePath);
}
if (truststorePassword == null || truststorePassword.isEmpty()) {
- LOG.warn("No password provided for truststore {}. Using empty
password - consider using password-protected truststores for better security.",
truststorePath);
+ log.warn("No password provided for truststore {}. Using empty
password - consider using password-protected truststores for better security.",
truststorePath);
}
- LOG.info("Loading truststore from: {}", truststorePath);
+ log.info("Loading truststore from: {}", truststorePath);
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
char[] truststorePasswordChars = (truststorePassword != null &&
!truststorePassword.isEmpty())
? truststorePassword.toCharArray() : new char[0];
@@ -151,7 +151,7 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
if (!Files.exists(Paths.get(caCertPath))) {
throw new HoodieDataHubSyncException("CA certificate file not found:
" + caCertPath);
}
- LOG.info("Loading CA certificate from: {}", caCertPath);
+ log.info("Loading CA certificate from: {}", caCertPath);
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null, null);
@@ -163,7 +163,7 @@ public class TlsEnabledDataHubEmitterSupplier implements
DataHubEmitterSupplier
trustStore.setCertificateEntry("ca-cert-" + certIndex, caCert);
certIndex++;
}
- LOG.info("Loaded {} CA certificate(s) from: {}", caCerts.size(),
caCertPath);
+ log.info("Loaded {} CA certificate(s) from: {}", caCerts.size(),
caCertPath);
}
sslContextBuilder.loadTrustMaterial(trustStore, null);
}
diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml
index 1874974e3661..8732901d5cac 100644
--- a/hudi-sync/hudi-hive-sync/pom.xml
+++ b/hudi-sync/hudi-hive-sync/pom.xml
@@ -43,6 +43,12 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 9cab763f8d57..d91d4d746ce0 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -38,10 +38,10 @@ import
org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import com.beust.jcommander.JCommander;
import com.codahale.metrics.Timer;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
@@ -85,15 +85,17 @@ import static
org.apache.hudi.sync.common.util.TableUtils.tableId;
* This utility will get the schema from the latest commit and will sync hive
table schema Also this will sync the
* partitions incrementally (all the partitions modified since the last commit)
*/
+@Slf4j
@SuppressWarnings("WeakerAccess")
public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
protected HiveSyncConfig config;
+ @Getter
private String databaseName;
+ @Getter
private String tableName;
protected HoodieSyncClient syncClient;
@@ -136,7 +138,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
this.syncClient = new HoodieHiveSyncClient(config, metaClient);
} catch (RuntimeException e) {
if (config.getBoolean(HIVE_IGNORE_EXCEPTIONS)) {
- LOG.error("Got runtime exception when hive syncing, but continuing as
ignoreExceptions config is set ", e);
+ log.error("Got runtime exception when hive syncing, but continuing as
ignoreExceptions config is set ", e);
} else {
throw new HoodieHiveSyncException("Got runtime exception when hive
syncing", e);
}
@@ -158,29 +160,20 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
: Option.of(tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
- LOG.error("Unknown table type " + syncClient.getTableType());
+ log.error("Unknown table type {}", syncClient.getTableType());
throw new InvalidTableException(syncClient.getBasePath());
}
}
}
- public String getTableName() {
- return this.tableName;
- }
-
- public String getDatabaseName() {
- return this.databaseName;
- }
-
@Override
public void syncHoodieTable() {
try {
if (syncClient != null) {
- LOG.info("Syncing target hoodie table with hive table("
- + tableId(databaseName, tableName) + "). Hive metastore URL from
HiveConf:"
- +
config.getHiveConf().get(HiveConf.ConfVars.METASTOREURIS.varname) + "). Hive
metastore URL from HiveSyncConfig:"
- + config.getString(METASTORE_URIS) + ", basePath :"
- + config.getString(META_SYNC_BASE_PATH));
+ log.info("Syncing target hoodie table with hive table({}). Hive
metastore URL from HiveConf:{}). "
+ + "Hive metastore URL from HiveSyncConfig:{}, basePath :{}",
+ tableId(databaseName, tableName),
config.getHiveConf().get(HiveConf.ConfVars.METASTOREURIS.varname),
+ config.getString(METASTORE_URIS),
config.getString(META_SYNC_BASE_PATH));
doSync();
}
@@ -221,7 +214,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
}
break;
default:
- LOG.error("Unknown table type " + syncClient.getTableType());
+ log.error("Unknown table type {}", syncClient.getTableType());
throw new InvalidTableException(syncClient.getBasePath());
}
}
@@ -241,20 +234,19 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
}
protected void syncHoodieTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) {
- LOG.info("Trying to sync hoodie table " + tableName + " with base path " +
syncClient.getBasePath()
- + " of type " + syncClient.getTableType());
+ log.info("Trying to sync hoodie table {} with base path {} of type {}",
tableName, syncClient.getBasePath(), syncClient.getTableType());
final boolean tableExists = syncClient.tableExists(tableName);
// if table exists and location of the metastore table doesn't match the
hoodie base path, recreate the table
if (tableExists &&
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(),
syncClient.getTableLocation(tableName))) {
- LOG.info("basepath is updated for the table {}", tableName);
+ log.info("basepath is updated for the table {}", tableName);
recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
return;
}
// Check if any sync is required
if (tableExists && isIncrementalSync() && isAlreadySynced(tableName)) {
- LOG.info("Table {} is already synced with the latest commit.",
tableName);
+ log.info("Table {} is already synced with the latest commit.",
tableName);
return;
}
// Get the parquet schema for this table looking at the latest commit
@@ -277,10 +269,10 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) ||
meetSyncConditions) {
syncClient.updateLastCommitTimeSynced(tableName);
}
- LOG.info("Sync complete for {}", tableName);
+ log.info("Sync complete for {}", tableName);
} catch (HoodieHiveSyncException ex) {
if (shouldRecreateAndSyncTable()) {
- LOG.warn("Failed to sync the table {}. Attempting trying to recreate",
tableName, ex);
+ log.warn("Failed to sync the table {}. Attempting trying to recreate",
tableName, ex);
recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
} else {
throw new HoodieHiveSyncException("failed to sync the table " +
tableName, ex);
@@ -309,11 +301,11 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
}
} catch (Exception e) {
// this is harmless since table creation will fail anyways, creation
of DB is needed for in-memory testing
- LOG.warn("Unable to create database", e);
+ log.warn("Unable to create database", e);
}
} else {
if (!syncClient.databaseExists(databaseName)) {
- LOG.error("Hive database does not exist {}", databaseName);
+ log.error("Hive database does not exist {}", databaseName);
throw new HoodieHiveSyncException("hive database does not exist " +
databaseName);
}
}
@@ -326,10 +318,10 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
Option<String> lastCommitCompletionTimeSynced = (tableExists &&
syncIncremental)
? syncClient.getLastCommitCompletionTimeSynced(tableName) :
Option.empty();
if (syncIncremental) {
- LOG.info(String.format("Last commit time synced was found to be %s, last
commit completion time is found to be %s",
- lastCommitTimeSynced.orElse("null"),
lastCommitCompletionTimeSynced.orElse("null")));
+ log.info("Last commit time synced was found to be {}, last commit
completion time is found to be {}",
+ lastCommitTimeSynced.orElse("null"),
lastCommitCompletionTimeSynced.orElse("null"));
} else {
- LOG.info(
+ log.info(
"Executing a full partition sync operation since {} is set to
false.",
META_SYNC_INCREMENTAL.key());
}
@@ -340,19 +332,18 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
// If the last commit time synced is before the start of the active
timeline,
// the Hive sync falls back to list all partitions on storage, instead of
// reading active and archived timelines for written partitions.
- LOG.info("Sync all partitions given the last commit time synced is empty
or "
- + "before the start of the active timeline. Listing all partitions
in "
- + config.getString(META_SYNC_BASE_PATH)
- + ", file system: " + config.getHadoopFileSystem());
+ log.info("Sync all partitions given the last commit time synced is empty
or before the start of the active timeline. "
+ + "Listing all partitions in {}, file system: {}",
+ config.getString(META_SYNC_BASE_PATH), config.getHadoopFileSystem());
partitionsChanged = syncAllPartitions(tableName);
} else {
List<String> writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
- LOG.info("Storage partitions scan complete. Found " +
writtenPartitionsSince.size());
+ log.info("Storage partitions scan complete. Found {}",
writtenPartitionsSince.size());
// Sync the partitions if needed
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
- LOG.info("Partitions dropped since last sync: {}",
droppedPartitions.size());
+ log.info("Partitions dropped since last sync: {}",
droppedPartitions.size());
partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
}
return partitionsChanged;
@@ -367,7 +358,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
}
private void recreateAndSyncHiveTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) {
- LOG.info("recreating and syncing the table {}", tableName);
+ log.info("recreating and syncing the table {}", tableName);
Timer.Context timerContext = metrics.getRecreateAndSyncTimer();
HoodieSchema schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
try {
@@ -415,7 +406,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
}
private void syncFirstTime(String tableName, boolean useRealTimeInputFormat,
boolean readAsOptimized, HoodieSchema schema) {
- LOG.info("Sync table {} for the first time.", tableName);
+ log.info("Sync table {} for the first time.", tableName);
HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
String inputFormatClassName = getInputFormatClassName(baseFileFormat,
useRealTimeInputFormat);
String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
@@ -439,9 +430,9 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
config.getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE));
if (schemaDiff.isEmpty()) {
- LOG.info("No Schema difference for {}.", tableName);
+ log.info("No Schema difference for {}.", tableName);
} else {
- LOG.info("Schema difference found for {}. Updated schema: {}",
tableName, schema);
+ log.info("Schema difference found for {}. Updated schema: {}",
tableName, schema);
syncClient.updateTableSchema(tableName, schema, schemaDiff);
schemaChanged = true;
}
@@ -543,19 +534,19 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
private boolean syncPartitions(String tableName, List<PartitionEvent>
partitionEventList) {
List<String> newPartitions = filterPartitions(partitionEventList,
PartitionEventType.ADD);
if (!newPartitions.isEmpty()) {
- LOG.info("New Partitions " + newPartitions);
+ log.info("New Partitions {}", newPartitions);
syncClient.addPartitionsToTable(tableName, newPartitions);
}
List<String> updatePartitions = filterPartitions(partitionEventList,
PartitionEventType.UPDATE);
if (!updatePartitions.isEmpty()) {
- LOG.info("Changed Partitions " + updatePartitions);
+ log.info("Changed Partitions {}", updatePartitions);
syncClient.updatePartitionsToTable(tableName, updatePartitions);
}
List<String> dropPartitions = filterPartitions(partitionEventList,
PartitionEventType.DROP);
if (!dropPartitions.isEmpty()) {
- LOG.info("Drop Partitions " + dropPartitions);
+ log.info("Drop Partitions {}", dropPartitions);
syncClient.dropPartitions(tableName, dropPartitions);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 6393ac95bcb5..0701305707b1 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -40,6 +40,7 @@ import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -47,8 +48,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
@@ -71,9 +70,9 @@ import static
org.apache.hudi.sync.common.util.TableUtils.tableId;
/**
* This class implements logic to sync a Hudi table with either the Hive
server or the Hive Metastore.
*/
+@Slf4j
public class HoodieHiveSyncClient extends HoodieSyncClient {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieHiveSyncClient.class);
protected final HiveSyncConfig config;
private final String databaseName;
private final Map<String, Table> initialTableByName = new HashMap<>();
@@ -196,7 +195,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
}
if (!shouldUpdate) {
- LOG.debug("Table {} serdeProperties and formatClass already up to
date, skip update.", tableName);
+ log.debug("Table {} serdeProperties and formatClass already up to
date, skip update.", tableName);
return false;
}
@@ -355,7 +354,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
Table table = client.getTable(databaseName, tableName);
return
Option.ofNullable(table.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP,
null));
} catch (NoSuchObjectException e) {
- LOG.error("database.table [{}.{}] not found in hms", databaseName,
tableName);
+ log.error("database.table [{}.{}] not found in hms", databaseName,
tableName);
return Option.empty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last replicated
time from the table " + tableName, e);
@@ -383,7 +382,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
String timestamp =
table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
client.alter_table(databaseName, tableName, table);
if (timestamp != null) {
- LOG.info("deleted last replicated timestamp " + timestamp + " for
table " + tableName);
+ log.info("deleted last replicated timestamp {} for table {}",
timestamp, tableName);
}
} catch (NoSuchObjectException e) {
// this is ok the table doesn't even exist.
@@ -402,7 +401,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
client = null;
}
} catch (Exception e) {
- LOG.error("Could not close connection ", e);
+ log.error("Could not close connection ", e);
}
}
@@ -470,7 +469,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
}
});
if (alterComments.isEmpty()) {
- LOG.info(String.format("No comment difference of %s ", tableName));
+ log.info("No comment difference of {} ", tableName);
return false;
} else {
ddlExecutor.updateTableComments(tableName, alterComments);
@@ -487,7 +486,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
public void dropTable(String tableName) {
try {
client.dropTable(databaseName, tableName);
- LOG.info("Successfully deleted table in Hive: {}.{}", databaseName,
tableName);
+ log.info("Successfully deleted table in Hive: {}.{}", databaseName,
tableName);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to delete the table " +
tableId(databaseName, tableName), e);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index d8112ffcd58d..81ba15660f39 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -30,6 +30,7 @@ import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -46,8 +47,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -67,10 +66,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_F
/**
* DDLExecutor impl based on HMS which use HMS apis directly for all DDL tasks.
*/
+@Slf4j
public class HMSDDLExecutor implements DDLExecutor {
- private static final Logger LOG =
LoggerFactory.getLogger(HMSDDLExecutor.class);
-
private final HiveSyncConfig syncConfig;
private final String databaseName;
private final IMetaStoreClient client;
@@ -95,7 +93,7 @@ public class HMSDDLExecutor implements DDLExecutor {
Database database = new Database(databaseName, "automatically created by
hoodie", null, null);
client.createDatabase(database);
} catch (Exception e) {
- LOG.error("Failed to create database " + databaseName, e);
+ log.error("Failed to create database {}", databaseName, e);
throw new HoodieHiveSyncException("Failed to create database " +
databaseName, e);
}
}
@@ -137,7 +135,7 @@ public class HMSDDLExecutor implements DDLExecutor {
}
client.createTable(newTb);
} catch (Exception e) {
- LOG.error("failed to create table " + tableName, e);
+ log.error("failed to create table {}", tableName, e);
throw new HoodieHiveSyncException("failed to create table " + tableName,
e);
}
}
@@ -153,12 +151,12 @@ public class HMSDDLExecutor implements DDLExecutor {
table.setSd(sd);
EnvironmentContext environmentContext = new EnvironmentContext();
if (cascade) {
- LOG.info("partition table,need cascade");
+ log.info("partition table,need cascade");
environmentContext.putToProperties(StatsSetupConst.CASCADE,
StatsSetupConst.TRUE);
}
client.alter_table_with_environmentContext(databaseName, tableName,
table, environmentContext);
} catch (Exception e) {
- LOG.error("Failed to update table for " + tableName, e);
+ log.error("Failed to update table for {}", tableName, e);
throw new HoodieHiveSyncException("Failed to update table for " +
tableName, e);
}
}
@@ -180,7 +178,7 @@ public class HMSDDLExecutor implements DDLExecutor {
schema.putAll(columnsMap);
schema.putAll(partitionKeysMap);
final long end = System.currentTimeMillis();
- LOG.info(String.format("Time taken to getTableSchema: %s ms", (end -
start)));
+ log.info("Time taken to getTableSchema: {} ms", (end - start));
return schema;
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table schema for : " +
tableName, e);
@@ -190,10 +188,10 @@ public class HMSDDLExecutor implements DDLExecutor {
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableName);
+ log.info("No partitions to add for {}", tableName);
return;
}
- LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " +
tableName);
+ log.info("Adding partitions {} to table {}", partitionsToAdd.size(),
tableName);
try {
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
int batchSyncPartitionNum =
syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
@@ -212,10 +210,10 @@ public class HMSDDLExecutor implements DDLExecutor {
partitionList.add(new Partition(partitionValues, databaseName,
tableName, 0, 0, partitionSd, null));
});
client.add_partitions(partitionList, true, false);
- LOG.info("HMSDDLExecutor add a batch partitions done: " +
partitionList.size());
+ log.info("HMSDDLExecutor add a batch partitions done: {}",
partitionList.size());
}
} catch (TException e) {
- LOG.error(databaseName + "." + tableName + " add partition failed", e);
+ log.error("{}.{} add partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " add
partition failed", e);
}
}
@@ -223,10 +221,10 @@ public class HMSDDLExecutor implements DDLExecutor {
@Override
public void updatePartitionsToTable(String tableName, List<String>
changedPartitions) {
if (changedPartitions.isEmpty()) {
- LOG.info("No partitions to change for " + tableName);
+ log.info("No partitions to change for {}", tableName);
return;
}
- LOG.info("Changing partitions " + changedPartitions.size() + " on " +
tableName);
+ log.info("Changing partitions {} on {}", changedPartitions.size(),
tableName);
try {
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
List<Partition> partitionList = changedPartitions.stream().map(partition
-> {
@@ -241,7 +239,7 @@ public class HMSDDLExecutor implements DDLExecutor {
}).collect(Collectors.toList());
client.alter_partitions(databaseName, tableName, partitionList, null);
} catch (TException e) {
- LOG.error(databaseName + "." + tableName + " update partition failed",
e);
+ log.error("{}.{} update partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + "
update partition failed", e);
}
}
@@ -249,11 +247,11 @@ public class HMSDDLExecutor implements DDLExecutor {
@Override
public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
- LOG.info("No partitions to drop for " + tableName);
+ log.info("No partitions to drop for {}", tableName);
return;
}
- LOG.info("Drop partitions " + partitionsToDrop.size() + " on " +
tableName);
+ log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName);
try {
for (String dropPartition : partitionsToDrop) {
if (HivePartitionUtil.partitionExists(client, tableName,
dropPartition, partitionValueExtractor, syncConfig)) {
@@ -261,10 +259,10 @@ public class HMSDDLExecutor implements DDLExecutor {
HivePartitionUtil.getPartitionClauseForDrop(dropPartition,
partitionValueExtractor, syncConfig);
client.dropPartition(databaseName, tableName, partitionClause,
false);
}
- LOG.info("Drop partition " + dropPartition + " on " + tableName);
+ log.info("Drop partition {} on {}", dropPartition, tableName);
}
} catch (TException e) {
- LOG.error(databaseName + "." + tableName + " drop partition failed", e);
+ log.error("{}.{} drop partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + "
drop partition failed", e);
}
}
@@ -285,7 +283,7 @@ public class HMSDDLExecutor implements DDLExecutor {
client.alter_table_with_environmentContext(databaseName, tableName,
table, environmentContext);
sd.clear();
} catch (Exception e) {
- LOG.error("Failed to update table comments for " + tableName, e);
+ log.error("Failed to update table comments for {}", tableName, e);
throw new HoodieHiveSyncException("Failed to update table comments for "
+ tableName, e);
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index 7cba6f9b7673..25434d29eb3f 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -23,6 +23,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.util.HivePartitionUtil;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -31,8 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,10 +46,9 @@ import static
org.apache.hudi.sync.common.util.TableUtils.tableId;
/**
* This class offers DDL executor backed by the hive.ql Driver This class
preserves the old useJDBC = false way of doing things.
*/
+@Slf4j
public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveQueryDDLExecutor.class);
-
private final IMetaStoreClient metaStoreClient;
private SessionState sessionState;
private Driver hiveDriver;
@@ -69,7 +67,7 @@ public class HiveQueryDDLExecutor extends
QueryBasedDDLExecutor {
try {
this.sessionState.close();
} catch (IOException ioException) {
- LOG.error("Error while closing SessionState", ioException);
+ log.error("Error while closing SessionState", ioException);
}
}
if (this.hiveDriver != null) {
@@ -91,7 +89,7 @@ public class HiveQueryDDLExecutor extends
QueryBasedDDLExecutor {
if (hiveDriver != null) {
HoodieTimer timer = HoodieTimer.start();
responses.add(hiveDriver.run(sql));
- LOG.info(String.format("Time taken to execute [%s]: %s ms", sql,
timer.endTimer()));
+ log.info("Time taken to execute [{}]: {} ms", sql, timer.endTimer());
}
}
} catch (Exception e) {
@@ -118,7 +116,7 @@ public class HiveQueryDDLExecutor extends
QueryBasedDDLExecutor {
schema.putAll(columnsMap);
schema.putAll(partitionKeysMap);
final long end = System.currentTimeMillis();
- LOG.info(String.format("Time taken to getTableSchema: %s ms", (end -
start)));
+ log.info("Time taken to getTableSchema: {} ms", (end - start));
return schema;
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table schema for : " +
tableName, e);
@@ -128,11 +126,11 @@ public class HiveQueryDDLExecutor extends
QueryBasedDDLExecutor {
@Override
public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
- LOG.info("No partitions to drop for " + tableName);
+ log.info("No partitions to drop for {}", tableName);
return;
}
- LOG.info("Drop partitions " + partitionsToDrop.size() + " on " +
tableName);
+ log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName);
try {
for (String dropPartition : partitionsToDrop) {
if (HivePartitionUtil.partitionExists(metaStoreClient, tableName,
dropPartition, partitionValueExtractor,
@@ -141,10 +139,10 @@ public class HiveQueryDDLExecutor extends
QueryBasedDDLExecutor {
HivePartitionUtil.getPartitionClauseForDrop(dropPartition,
partitionValueExtractor, config);
metaStoreClient.dropPartition(databaseName, tableName,
partitionClause, false);
}
- LOG.info("Drop partition " + dropPartition + " on " + tableName);
+ log.info("Drop partition {} on {}", dropPartition, tableName);
}
} catch (Exception e) {
- LOG.error(tableId(databaseName, tableName) + " drop partition failed",
e);
+ log.error("{} drop partition failed", tableId(databaseName, tableName),
e);
throw new HoodieHiveSyncException(tableId(databaseName, tableName) + "
drop partition failed", e);
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 5c640062384a..f0784497a9ff 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -21,8 +21,7 @@ package org.apache.hudi.hive.ddl;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -45,10 +44,9 @@ import static
org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
/**
* This class offers DDL executor backed by the jdbc This class preserves the
old useJDBC = true way of doing things.
*/
+@Slf4j
public class JDBCExecutor extends QueryBasedDDLExecutor {
- private static final Logger LOG =
LoggerFactory.getLogger(JDBCExecutor.class);
-
private Connection connection;
public JDBCExecutor(HiveSyncConfig config) {
@@ -64,7 +62,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
Statement stmt = null;
try {
stmt = connection.createStatement();
- LOG.info("Executing SQL " + s);
+ log.info("Executing SQL {}", s);
stmt.execute(s);
} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
@@ -79,7 +77,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
stmt.close();
}
} catch (SQLException e) {
- LOG.info("Could not close the statement opened ", e);
+ log.info("Could not close the statement opened ", e);
}
try {
@@ -87,7 +85,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
resultSet.close();
}
} catch (SQLException e) {
- LOG.info("Could not close the resultset opened ", e);
+ log.info("Could not close the resultset opened ", e);
}
}
@@ -96,13 +94,13 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
- LOG.error("Unable to load Hive driver class", e);
+ log.error("Unable to load Hive driver class", e);
return;
}
try {
this.connection = DriverManager.getConnection(jdbcUrl, hiveUser,
hivePass);
- LOG.info("Successfully established Hive connection to " + jdbcUrl);
+ log.info("Successfully established Hive connection to {}", jdbcUrl);
} catch (SQLException e) {
throw new HoodieHiveSyncException("Cannot create hive connection " +
getHiveJdbcUrlWithDefaultDBName(jdbcUrl), e);
}
@@ -152,10 +150,10 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
@Override
public void dropPartitionsToTable(String tableName, List<String>
partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
- LOG.info("No partitions to add for " + tableName);
+ log.info("No partitions to add for {}", tableName);
return;
}
- LOG.info("Dropping partitions " + partitionsToDrop.size() + " from table "
+ tableName);
+ log.info("Dropping partitions {} from table {}", partitionsToDrop.size(),
tableName);
List<String> sqls = constructDropPartitions(tableName, partitionsToDrop);
sqls.stream().forEach(sql -> runSQL(sql));
}
@@ -200,7 +198,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
connection.close();
}
} catch (SQLException e) {
- LOG.error("Could not close connection ", e);
+ log.error("Could not close connection ", e);
}
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
index 6023b3135dc5..472bcedd328a 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
@@ -30,9 +30,8 @@ import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -51,10 +50,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_F
/**
* This class adds functionality for all query based DDLExecutors. The classes
extending it only have to provide runSQL(sql) functions.
*/
+@Slf4j
public abstract class QueryBasedDDLExecutor implements DDLExecutor {
- private static final Logger LOG =
LoggerFactory.getLogger(QueryBasedDDLExecutor.class);
-
protected final HiveSyncConfig config;
protected final String databaseName;
protected final PartitionValueExtractor partitionValueExtractor;
@@ -89,7 +87,7 @@ public abstract class QueryBasedDDLExecutor implements
DDLExecutor {
String createSQLQuery =
HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, config,
inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
- LOG.info("Creating table with " + createSQLQuery);
+ log.info("Creating table with {}", createSQLQuery);
runSQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to create table " + tableName,
e);
@@ -107,7 +105,7 @@ public abstract class QueryBasedDDLExecutor implements
DDLExecutor {
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
.append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
.append(newSchemaStr).append(" )").append(cascadeClause);
- LOG.info("Updating table definition with " + sqlBuilder);
+ log.info("Updating table definition with {}", sqlBuilder);
runSQL(sqlBuilder.toString());
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to update table for " +
tableName, e);
@@ -117,10 +115,10 @@ public abstract class QueryBasedDDLExecutor implements
DDLExecutor {
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + tableName);
+ log.info("No partitions to add for {}", tableName);
return;
}
- LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " +
tableName);
+ log.info("Adding partitions {} to table {}", partitionsToAdd.size(),
tableName);
List<String> sqls = constructAddPartitions(tableName, partitionsToAdd);
sqls.stream().forEach(sql -> runSQL(sql));
}
@@ -128,10 +126,10 @@ public abstract class QueryBasedDDLExecutor implements
DDLExecutor {
@Override
public void updatePartitionsToTable(String tableName, List<String>
changedPartitions) {
if (changedPartitions.isEmpty()) {
- LOG.info("No partitions to change for " + tableName);
+ log.info("No partitions to change for {}", tableName);
return;
}
- LOG.info("Changing partitions " + changedPartitions.size() + " on " +
tableName);
+ log.info("Changing partitions {} on {}", changedPartitions.size(),
tableName);
List<String> sqls = constructChangePartitions(tableName,
changedPartitions);
for (String sql : sqls) {
runSQL(sql);
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
index 5a2eab15c685..da6bae2342d9 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
@@ -22,9 +22,8 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HiveSyncTool;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -32,9 +31,9 @@ import java.util.Properties;
import static
org.apache.hudi.hive.replication.GlobalHiveSyncConfig.META_SYNC_GLOBAL_REPLICATE_TIMESTAMP;
+@Slf4j
public class GlobalHiveSyncTool extends HiveSyncTool {
- private static final Logger LOG =
LoggerFactory.getLogger(GlobalHiveSyncTool.class);
protected final GlobalHiveSyncConfig config;
public GlobalHiveSyncTool(Properties props, Configuration hadoopConf) {
@@ -53,9 +52,9 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
Option<String> timestamp =
Option.ofNullable(config.getString(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP));
if (timestamp.isPresent()) {
syncClient.updateLastReplicatedTimeStamp(tableName, timestamp.get());
- LOG.info("Sync complete for {}", tableName);
+ log.info("Sync complete for {}", tableName);
} else {
- LOG.warn("Sync skipped: {} is not set.",
META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key());
+ log.warn("Sync skipped: {} is not set.",
META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key());
}
}
@@ -75,10 +74,10 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
Option<String> timestamp = timeStampMap.get(tableName);
if (timestamp.isPresent()) {
syncClient.updateLastReplicatedTimeStamp(tableName, timestamp.get());
- LOG.info("updated timestamp for " + tableName + " to: " +
timestamp.get());
+ log.info("updated timestamp for {} to: {}", tableName,
timestamp.get());
} else {
syncClient.deleteLastReplicatedTimeStamp(tableName);
- LOG.info("deleted timestamp for " + tableName);
+ log.info("deleted timestamp for {}", tableName);
}
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitParams.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitParams.java
index 08252f9297bb..2e5ffc1f1e3d 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitParams.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitParams.java
@@ -24,8 +24,7 @@ import org.apache.hudi.common.util.StringUtils;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.ParametersDelegate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.FileInputStream;
import java.io.IOException;
@@ -44,10 +43,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+ " The tool tries to be transactional but does not guarantee it. If the
sync fails midway in one cluster it will try to roll back the committed "
+ " timestamp from already successful sync on other clusters but that can
also fail."
+ " The tool does not roll back any synced partitions but only the
timestamp.")
+@Slf4j
public class HiveSyncGlobalCommitParams {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveSyncGlobalCommitParams.class);
-
public static String LOCAL_HIVE_SITE_URI =
"hivesyncglobal.local_hive_site_uri";
public static String REMOTE_HIVE_SITE_URI =
"hivesyncglobal.remote_hive_site_uri";
public static String REMOTE_BASE_PATH = "hivesyncglobal.remote_base_path";
@@ -92,8 +90,7 @@ public class HiveSyncGlobalCommitParams {
String jdbcUrl = forRemote ?
loadedProps.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
: loadedProps.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS,
loadedProps.getProperty(HIVE_URL.key()));
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
- LOG.info("building hivesync config forRemote: " + forRemote + " " +
jdbcUrl + " "
- + basePath);
+ log.info("building hivesync config forRemote: {} {} {}", forRemote,
jdbcUrl, basePath);
return props;
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
index 9c7f8e55ecd7..b69b6f365b16 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
@@ -21,10 +21,9 @@ package org.apache.hudi.hive.replication;
import org.apache.hudi.hive.HoodieHiveSyncException;
import com.beust.jcommander.JCommander;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,9 +32,9 @@ import java.util.List;
import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SITE_URI;
import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SITE_URI;
+@Slf4j
public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit,
AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveSyncGlobalCommitTool.class);
private final HiveSyncGlobalCommitParams params;
private final List<ReplicationStateSync> replicationStateSyncList;
@@ -60,26 +59,26 @@ public class HiveSyncGlobalCommitTool implements
HiveSyncGlobalCommit, AutoClose
try {
for (ReplicationStateSync stateSync : replicationStateSyncList) {
Thread.currentThread().setName(stateSync.getClusterId());
- LOG.info("starting sync for state " + stateSync);
+ log.info("starting sync for state {}", stateSync);
stateSync.sync();
- LOG.info("synced state " + stateSync);
+ log.info("synced state {}", stateSync);
}
} catch (Exception e) {
Thread.currentThread().setName(name);
- LOG.error(String.format("Error while trying to commit replication state
%s", e.getMessage()), e);
+ log.error("Error while trying to commit replication state {}",
e.getMessage(), e);
return false;
} finally {
Thread.currentThread().setName(name);
}
- LOG.info("done syncing to all tables, verifying the timestamps...");
+ log.info("done syncing to all tables, verifying the timestamps...");
ReplicationStateSync base = replicationStateSyncList.get(0);
boolean success = true;
- LOG.info("expecting all timestamps to be similar to: " + base);
+ log.info("expecting all timestamps to be similar to: {}", base);
for (int idx = 1; idx < replicationStateSyncList.size(); ++idx) {
ReplicationStateSync other = replicationStateSyncList.get(idx);
if (!base.replicationStateIsInSync(other)) {
- LOG.error("the timestamp of other : " + other + " is not matching with
base: " + base);
+ log.error("the timestamp of other: {} is not matching with base: {}",
other, base);
success = false;
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
index c2ed57e6becb..deb84ff48a53 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hive.replication;
import org.apache.hudi.common.util.Option;
+import lombok.Getter;
import org.apache.hadoop.hive.conf.HiveConf;
import java.util.Map;
@@ -30,6 +31,7 @@ public class ReplicationStateSync implements AutoCloseable {
protected GlobalHiveSyncTool globalHiveSyncTool;
private Map<String, Option<String>> replicatedTimeStampMap;
private Map<String, Option<String>> oldReplicatedTimeStampMap;
+ @Getter
private final String clusterId;
ReplicationStateSync(Properties props, HiveConf hiveConf, String uid) {
@@ -71,10 +73,6 @@ public class ReplicationStateSync implements AutoCloseable {
return "{ clusterId: " + clusterId + " replicatedState: " +
replicatedTimeStampMap + " }";
}
- public String getClusterId() {
- return clusterId;
- }
-
@Override
public void close() {
if (globalHiveSyncTool != null) {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
index 2e9dc10e644c..80a95801fbdb 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
@@ -27,6 +27,8 @@ import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.hive.util.IMetaStoreClientUtil;
import org.apache.hudi.storage.StorageConfiguration;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -41,8 +43,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;
@@ -76,14 +76,15 @@ import static
org.apache.hudi.common.lock.LockState.RELEASING;
* using hive metastore APIs. Users need to have a HiveMetastore & Zookeeper
cluster deployed to be able to use this lock.
*
*/
+@Slf4j
public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse>, Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveMetastoreBasedLockProvider.class);
-
private final String databaseName;
private final String tableName;
private final String hiveMetastoreUris;
+ @Getter
private transient IMetaStoreClient hiveClient;
+ @Getter
private volatile LockResponse lock = null;
protected LockConfiguration lockConfiguration;
private transient ScheduledFuture<?> future = null;
@@ -116,7 +117,7 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
@Override
public boolean tryLock(long time, TimeUnit unit) {
- LOG.info(generateLogStatement(ACQUIRING, generateLogSuffixString()));
+ log.info(generateLogStatement(ACQUIRING, generateLogSuffixString()));
try {
acquireLock(time, unit);
} catch (ExecutionException | InterruptedException | TimeoutException |
TException e) {
@@ -128,7 +129,7 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
@Override
public void unlock() {
try {
- LOG.info(generateLogStatement(RELEASING, generateLogSuffixString()));
+ log.info(generateLogStatement(RELEASING, generateLogSuffixString()));
LockResponse lockResponseLocal = lock;
if (lockResponseLocal == null) {
return;
@@ -138,7 +139,7 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
future.cancel(false);
}
hiveClient.unlock(lockResponseLocal.getLockid());
- LOG.info(generateLogStatement(RELEASED, generateLogSuffixString()));
+ log.info(generateLogStatement(RELEASED, generateLogSuffixString()));
} catch (TException e) {
throw new HoodieLockException(generateLogStatement(FAILED_TO_RELEASE,
generateLogSuffixString()), e);
}
@@ -168,19 +169,10 @@ public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse
Hive.closeCurrent();
executor.shutdown();
} catch (Exception e) {
-
LOG.error(generateLogStatement(org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+
log.error(generateLogStatement(org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
}
}
- public IMetaStoreClient getHiveClient() {
- return hiveClient;
- }
-
- @Override
- public LockResponse getLock() {
- return this.lock;
- }
-
// This API is exposed for tests and not intended to be used elsewhere
public boolean acquireLock(long time, TimeUnit unit, final LockComponent
component)
throws InterruptedException, ExecutionException, TimeoutException,
TException {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
index 76a82406ac2f..3e75582266df 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
@@ -24,12 +24,11 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -38,8 +37,8 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NA
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DECODE_PARTITION;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+@Slf4j
public class HivePartitionUtil {
- private static final Logger LOG =
LoggerFactory.getLogger(HivePartitionUtil.class);
/**
* Build String, example as year=2021/month=06/day=25
@@ -72,7 +71,7 @@ public class HivePartitionUtil {
} catch (NoSuchObjectException ignored) {
newPartition = null;
} catch (TException e) {
- LOG.error("Failed to get partition " + partitionPath, e);
+ log.error("Failed to get partition {}", partitionPath, e);
throw new HoodieHiveSyncException("Failed to get partition " +
partitionPath, e);
}
return newPartition != null;
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index a27972502cb7..2741cdfbfe19 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -24,9 +24,8 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,9 +46,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_F
/**
* Schema Utilities.
*/
+@Slf4j
public class HiveSchemaUtil {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveSchemaUtil.class);
public static final String HIVE_ESCAPE_CHARACTER = "`";
public static final String BOOLEAN_TYPE_NAME = "boolean";
@@ -77,7 +76,7 @@ public class HiveSchemaUtil {
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to convert schema to hive
schema", e);
}
- LOG.debug("Getting schema difference for {} \r\n\r\n{}", tableSchema,
newTableSchema);
+ log.debug("Getting schema difference for {} \r\n\r\n{}", tableSchema,
newTableSchema);
SchemaDifference.Builder schemaDiffBuilder =
SchemaDifference.newBuilder(storageSchema, tableSchema);
Set<String> tableColumns = new HashSet<>();
@@ -96,7 +95,7 @@ public class HiveSchemaUtil {
continue;
}
// We will log this and continue. Hive schema is a superset of all
schemas
- LOG.info("Ignoring table column {} as its not present in the table
schema", fieldName);
+ log.info("Ignoring table column {} as its not present in the table
schema", fieldName);
continue;
}
tableColumnType = tableColumnType.replaceAll("\\s+", "");
@@ -118,7 +117,7 @@ public class HiveSchemaUtil {
}
}
SchemaDifference result = schemaDiffBuilder.build();
- LOG.debug("Difference between schemas: {}", result);
+ log.debug("Difference between schemas: {}", result);
return result;
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
index bbb25562e5f0..e3d53932461a 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.storage.StoragePath;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -45,8 +47,6 @@ import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -59,9 +59,8 @@ import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+@Slf4j
public class HiveTestService {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HiveTestService.class);
private static final int CONNECTION_TIMEOUT_MS = 30000;
private static final String BIND_HOST = "127.0.0.1";
private static final int HS2_THRIFT_PORT = 9999;
@@ -72,7 +71,9 @@ public class HiveTestService {
private final Map<String, String> sysProps = new HashMap<>();
private ExecutorService executorService;
private TServer tServer;
+ @Getter
private HiveServer2 hiveServer;
+ @Getter
private HiveConf hiveConf;
public HiveTestService(Configuration hadoopConf) throws IOException {
@@ -84,7 +85,7 @@ public class HiveTestService {
Objects.requireNonNull(workDir, "The work dir must be set before starting
cluster.");
String localHiveLocation = getHiveLocation(workDir);
- LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and
starting fresh.");
+ log.info("Cleaning Hive cluster data at: {} and starting fresh.",
localHiveLocation);
File file = new File(localHiveLocation);
FileIOUtils.deleteDirectory(file);
@@ -99,7 +100,7 @@ public class HiveTestService {
throw new IOException("Waiting for startup of standalone server");
}
- LOG.info("Hive Minicluster service started.");
+ log.info("Hive Minicluster service started.");
return hiveServer;
}
@@ -109,32 +110,24 @@ public class HiveTestService {
try {
tServer.stop();
} catch (Exception e) {
- LOG.error("Stop meta store failed", e);
+ log.error("Stop meta store failed", e);
}
}
if (hiveServer != null) {
try {
hiveServer.stop();
} catch (Exception e) {
- LOG.error("Stop hive server failed", e);
+ log.error("Stop hive server failed", e);
}
}
if (executorService != null) {
executorService.shutdownNow();
}
- LOG.info("Hive Minicluster service shut down.");
+ log.info("Hive Minicluster service shut down.");
tServer = null;
hiveServer = null;
}
- public HiveServer2 getHiveServer() {
- return hiveServer;
- }
-
- public HiveConf getHiveConf() {
- return hiveConf;
- }
-
public int getHiveServerPort() {
return hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
}
@@ -174,7 +167,7 @@ public class HiveTestService {
}
private boolean waitForServerUp(HiveConf serverConf) {
- LOG.info("waiting for " + serverConf.getVar(ConfVars.METASTOREURIS));
+ log.info("waiting for {}", serverConf.getVar(ConfVars.METASTOREURIS));
final long start = System.currentTimeMillis();
while (true) {
try {
@@ -297,11 +290,11 @@ public class HiveTestService {
: new TUGIContainingTransport.Factory();
processor = new TUGIBasedProcessor<>(handler);
- LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
+ log.info("Starting DB backed MetaStore Server with SetUGI enabled");
} else {
transFactory = useFramedTransport ? new TFramedTransport.Factory() :
new TTransportFactory();
processor = new TSetIpAddressProcessor<>(handler);
- LOG.info("Starting DB backed MetaStore Server");
+ log.info("Starting DB backed MetaStore Server");
}
TThreadPoolServer.Args args = new
TThreadPoolServer.Args(serverTransport).processor(processor)
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index ebe139dba5f9..1ab5bb522422 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -61,9 +61,11 @@ import org.apache.hudi.hive.util.IMetaStoreClientUtil;
import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.storage.HoodieInstantWriter;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -77,8 +79,6 @@ import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.platform.commons.JUnitException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -116,9 +116,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_F
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.fail;
+@Slf4j
@SuppressWarnings("SameParameterValue")
public class HiveTestUtil {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveTestUtil.class);
public static final String DB_NAME = "testdb";
public static final String TABLE_NAME = "test1";
@@ -133,8 +133,10 @@ public class HiveTestUtil {
private static HiveServer2 hiveServer;
private static ZookeeperTestService zkService;
private static Configuration configuration;
+ @Getter
public static HiveSyncConfig hiveSyncConfig;
private static DateTimeFormatter dtfOut;
+ @Getter
private static Set<String> createdTablesSet = new HashSet<>();
public static void setUp(Option<TypedProperties> hiveSyncProperties, boolean
shouldClearBasePathAndTables) throws Exception {
@@ -202,10 +204,6 @@ public class HiveTestUtil {
return hiveServer.getHiveConf();
}
- public static HiveSyncConfig getHiveSyncConfig() {
- return hiveSyncConfig;
- }
-
public static void shutdown() {
List<String> failedReleases = new ArrayList<>();
try {
@@ -271,7 +269,7 @@ public class HiveTestUtil {
}
if (!failedReleases.isEmpty()) {
- LOG.error("Exception happened during releasing: " + String.join(",",
failedReleases));
+ log.error("Exception happened during releasing: {}", String.join(",",
failedReleases));
}
}
@@ -330,7 +328,7 @@ public class HiveTestUtil {
storage.deleteFile(path);
}
} catch (IOException e) {
- LOG.warn("Error deleting file: ", e);
+ log.warn("Error deleting file: ", e);
}
});
}
@@ -816,8 +814,4 @@ public class HiveTestUtil {
}
}
}
-
- public static Set<String> getCreatedTablesSet() {
- return createdTablesSet;
- }
}
diff --git a/hudi-sync/hudi-sync-common/pom.xml
b/hudi-sync/hudi-sync-common/pom.xml
index 52acee47f649..1861e6b155ca 100644
--- a/hudi-sync/hudi-sync-common/pom.xml
+++ b/hudi-sync/hudi-sync-common/pom.xml
@@ -38,6 +38,12 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
index 0f9b2acff9f8..cdf0d8ddea67 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/hive/SchemaDifference.java
@@ -20,6 +20,8 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.schema.HoodieSchema;
+import lombok.Getter;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -35,8 +37,11 @@ public class SchemaDifference {
private final HoodieSchema storageSchema;
private final Map<String, String> tableSchema;
+ @Getter
private final List<String> deleteColumns;
+ @Getter
private final Map<String, String> updateColumnTypes;
+ @Getter
private final Map<String, String> addColumnTypes;
private SchemaDifference(HoodieSchema storageSchema, Map<String, String>
tableSchema, List<String> deleteColumns,
@@ -48,18 +53,6 @@ public class SchemaDifference {
this.addColumnTypes = Collections.unmodifiableMap(addColumnTypes);
}
- public List<String> getDeleteColumns() {
- return deleteColumns;
- }
-
- public Map<String, String> getUpdateColumnTypes() {
- return updateColumnTypes;
- }
-
- public Map<String, String> getAddColumnTypes() {
- return addColumnTypes;
- }
-
public static Builder newBuilder(HoodieSchema storageSchema, Map<String,
String> tableSchema) {
return new Builder(storageSchema, tableSchema);
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 85793aa082b6..04a6e4a10db1 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -34,9 +34,9 @@ import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -53,12 +53,12 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_E
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
+@Slf4j
public abstract class HoodieSyncClient implements HoodieMetaSyncOperations,
AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSyncClient.class);
-
protected final HoodieSyncConfig config;
protected final PartitionValueExtractor partitionValueExtractor;
+ @Getter
protected final HoodieTableMetaClient metaClient;
protected final ParquetTableSchemaResolver tableSchemaResolver;
private static final String TEMP_SUFFIX = "_temp";
@@ -86,10 +86,6 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
}
- public HoodieTableMetaClient getMetaClient() {
- return metaClient;
- }
-
public String getTableName() {
return config.getString(META_SYNC_TABLE_NAME);
}
@@ -154,11 +150,11 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
- LOG.info("Last commit time synced is not known, listing all partitions
in {} , FS: {}",
+ log.info("Last commit time synced is not known, listing all partitions
in {} , FS: {}",
config.getString(META_SYNC_BASE_PATH), config.getHadoopFileSystem());
return getAllPartitionPathsOnStorage();
} else {
- LOG.info("Last commit time synced is {}, Getting commits since then",
lastCommitTimeSynced.get());
+ log.info("Last commit time synced is {}, Getting commits since then",
lastCommitTimeSynced.get());
return TimelineUtils.getWrittenPartitions(
TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced));
}
@@ -210,7 +206,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
metaClient.getBasePath(), new StoragePath(storagePath));
events.add(PartitionEvent.newPartitionDropEvent(relativePath));
} catch (IllegalArgumentException e) {
- LOG.error("Cannot parse the path stored in the metastore, ignoring it
for generating DROP partition event: \"{}\".",
+ log.error("Cannot parse the path stored in the metastore, ignoring it
for generating DROP partition event: \"{}\".",
storagePath, e);
}
});
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index 7fd76ef52390..f23c7584e653 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -33,10 +33,11 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import com.beust.jcommander.Parameter;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.Immutable;
@@ -59,14 +60,13 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIO
* Configs needed to sync data into external meta stores, catalogs, etc.
*/
@Immutable
+@Slf4j
@ConfigClassProperty(name = "Common Metadata Sync Configs",
groupName = ConfigGroups.Names.META_SYNC,
areCommonConfigs = true,
description = "")
public class HoodieSyncConfig extends HoodieConfig {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSyncConfig.class);
-
public static final ConfigProperty<String> META_SYNC_BASE_PATH =
ConfigProperty
.key(META_SYNC_BASE_PATH_KEY)
.defaultValue("")
@@ -201,7 +201,10 @@ public class HoodieSyncConfig extends HoodieConfig {
+ "This is useful when the partition metadata is large, and the
partition info can be "
+ "obtained from Hudi's internal metadata table. Note, " +
HoodieMetadataConfig.ENABLE + " must be set to true.");
+ @Getter
+ @Setter
private Configuration hadoopConf;
+ @Getter
private final HoodieMetricsConfig metricsConfig;
public HoodieSyncConfig(Properties props) {
@@ -210,8 +213,8 @@ public class HoodieSyncConfig extends HoodieConfig {
public HoodieSyncConfig(Properties props, Configuration hadoopConf) {
super(props);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Passed in properties:\n" + props.entrySet()
+ if (log.isDebugEnabled()) {
+ log.debug("Passed in properties:\n{}", props.entrySet()
.stream()
.sorted(Comparator.comparing(e -> e.getKey().toString()))
.map(e -> e.getKey() + "=" + e.getValue())
@@ -226,18 +229,6 @@ public class HoodieSyncConfig extends HoodieConfig {
return getString(BASE_PATH);
}
- public void setHadoopConf(Configuration hadoopConf) {
- this.hadoopConf = hadoopConf;
- }
-
- public Configuration getHadoopConf() {
- return hadoopConf;
- }
-
- public HoodieMetricsConfig getMetricsConfig() {
- return metricsConfig;
- }
-
public FileSystem getHadoopFileSystem() {
return HadoopFSUtils.getFs(getString(META_SYNC_BASE_PATH),
getHadoopConf());
}
@@ -287,13 +278,10 @@ public class HoodieSyncConfig extends HoodieConfig {
@Parameter(names = {"--sync-no-partition-metadata"}, description = "do not
sync partition metadata info to the catalog")
public Boolean shouldNotSyncPartitionMetadata;
+ @Getter
@Parameter(names = {"--help", "-h"}, help = true)
public boolean help = false;
- public boolean isHelp() {
- return help;
- }
-
public TypedProperties toProps() {
final TypedProperties props = new TypedProperties();
props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), basePath);
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
index 4013be509fa2..c21adf7bc812 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/metrics/HoodieMetaSyncMetrics.java
@@ -29,12 +29,12 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class HoodieMetaSyncMetrics {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetaSyncMetrics.class);
private static final String TIMER_METRIC_EXTENSION = ".timer";
private static final String COUNTER_METRIC_EXTENSION = ".counter";
private static final String META_SYNC_RECREATE_TABLE_METRIC =
"meta_sync.recreate_table";
@@ -42,6 +42,7 @@ public class HoodieMetaSyncMetrics {
private static final String META_SYNC_ACTION = "meta_sync";
private static final String RECREATE_TABLE_DURATION_MS_METRIC =
"recreate_table_duration_ms";
// Metrics are shut down by the shutdown hook added in the Metrics class
+ @Getter
private Metrics metrics;
private final HoodieMetricsConfig metricsConfig;
private transient HoodieStorage storage;
@@ -65,10 +66,6 @@ public class HoodieMetaSyncMetrics {
}
}
- public Metrics getMetrics() {
- return metrics;
- }
-
public Timer.Context getRecreateAndSyncTimer() {
if (metricsConfig.isMetricsOn() && recreateAndSyncTimer == null) {
recreateAndSyncTimer = createTimer(recreateAndSyncTimerName);
@@ -88,7 +85,7 @@ public class HoodieMetaSyncMetrics {
public void updateRecreateAndSyncDurationInMs(long durationInNs) {
if (metricsConfig.isMetricsOn()) {
long durationInMs = getDurationInMs(durationInNs);
- LOG.info("Sending recreate and sync metrics {}", durationInMs);
+ log.info("Sending recreate and sync metrics {}", durationInMs);
metrics.registerGauge(getMetricsName(META_SYNC_ACTION,
RECREATE_TABLE_DURATION_MS_METRIC), durationInMs);
}
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/FieldSchema.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/FieldSchema.java
index d9506be0809a..9314d7d2c7ff 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/FieldSchema.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/FieldSchema.java
@@ -21,8 +21,15 @@ package org.apache.hudi.sync.common.model;
import org.apache.hudi.common.util.Option;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
import java.util.Objects;
+@AllArgsConstructor
+@Getter
+@Setter
public class FieldSchema {
private final String name;
@@ -37,40 +44,10 @@ public class FieldSchema {
this(name, type, Option.ofNullable(comment));
}
- public FieldSchema(String name, String type, Option<String> comment) {
- this.name = name;
- this.type = type;
- this.comment = comment;
- }
-
- public String getName() {
- return name;
- }
-
- public String getType() {
- return type;
- }
-
- public Option<String> getComment() {
- return comment;
- }
-
public String getCommentOrEmpty() {
return comment.orElse("");
}
- public void setType(String type) {
- this.type = type;
- }
-
- public void setComment(Option<String> comment) {
- this.comment = comment;
- }
-
- public void setComment(String comment) {
- this.comment = Option.ofNullable(comment);
- }
-
public boolean updateComment(FieldSchema another) {
if (Objects.equals(name, another.getName())
&& !Objects.equals(getCommentOrEmpty(), another.getCommentOrEmpty())) {
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java
index 8e2076f95cb9..d5caa853c3c8 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java
@@ -19,24 +19,16 @@
package org.apache.hudi.sync.common.model;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
import java.util.List;
+@AllArgsConstructor
+@Getter
public class Partition {
private final List<String> values;
private final String storageLocation;
-
- public Partition(List<String> values, String storageLocation) {
- this.values = values;
- this.storageLocation = storageLocation;
- }
-
- public List<String> getValues() {
- return values;
- }
-
- public String getStorageLocation() {
- return storageLocation;
- }
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
index 07533ee90087..567d9135099c 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -32,9 +32,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.OutputStream;
@@ -44,12 +43,12 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Slf4j
public class ManifestFileWriter {
public static final String MANIFEST_FOLDER_NAME = "manifest";
public static final String ABSOLUTE_PATH_MANIFEST_FOLDER_NAME =
"absolute-path-manifest";
public static final String MANIFEST_FILE_NAME = "latest-snapshot.csv";
- private static final Logger LOG =
LoggerFactory.getLogger(ManifestFileWriter.class);
private final HoodieTableMetaClient metaClient;
private final boolean useFileListingFromMetadata;
@@ -67,10 +66,10 @@ public class ManifestFileWriter {
List<String> baseFiles =
fetchLatestBaseFilesForAllPartitions(useAbsolutePath)
.collect(Collectors.toList());
if (baseFiles.isEmpty()) {
- LOG.warn("No base file to generate manifest file.");
+ log.warn("No base file to generate manifest file.");
return;
} else {
- LOG.info("Writing base file names to manifest file: {}",
baseFiles.size());
+ log.info("Writing base file names to manifest file: {}",
baseFiles.size());
}
final StoragePath manifestFilePath =
getManifestFilePath(useAbsolutePath);
try (OutputStream outputStream =
metaClient.getStorage().create(manifestFilePath, true);
@@ -105,12 +104,10 @@ public class ManifestFileWriter {
static Stream<String> getLatestBaseFiles(boolean canUseMetadataTable,
HoodieEngineContext engContext, HoodieTableMetaClient metaClient,
boolean useAbsolutePath) {
List<String> partitions = FSUtils.getAllPartitionPaths(engContext,
metaClient, canUseMetadataTable);
- LOG.info("Retrieve all partitions: {}", partitions.size());
- HoodieTableFileSystemView fsView = null;
- try {
- fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext,
metaClient,
-
HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(),
-
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+ log.info("Retrieve all partitions: {}", partitions.size());
+ try (HoodieTableFileSystemView fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engContext,
metaClient,
+ HoodieMetadataConfig.newBuilder().enable(canUseMetadataTable).build(),
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()))
{
if (canUseMetadataTable) {
// incase of MDT, we can load all partitions at once. If not for MDT,
we can rely on fsView.getLatestBaseFiles(partition) for each partition to load
from FS.
fsView.loadAllPartitions();
@@ -120,8 +117,6 @@ public class ManifestFileWriter {
// fails the getLatestBaseFiles call. Hence we collect and return a
stream.
return partitions.parallelStream().flatMap(partition ->
finalFsView.getLatestBaseFiles(partition)
.map(useAbsolutePath ? HoodieBaseFile::getPath :
HoodieBaseFile::getFileName)).collect(Collectors.toList()).stream();
- } finally {
- fsView.close();
}
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index 080dd2c1891d..ed38156f3580 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -27,10 +27,9 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.sync.common.HoodieSyncTool;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
@@ -46,8 +45,8 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
/**
* Helper class for syncing Hudi commit data with external metastores.
*/
+@Slf4j
public class SyncUtilHelpers {
- private static final Logger LOG =
LoggerFactory.getLogger(SyncUtilHelpers.class);
// Locks for each table (base path) to avoid concurrent modification of the
same underneath meta storage.
// Meta store such as Hive may encounter {@code
ConcurrentModificationException} for #alter_table.
@@ -126,7 +125,7 @@ public class SyncUtilHelpers {
if (properties.containsKey(META_SYNC_TABLE_NAME.key())) {
String tableName = properties.getString(META_SYNC_TABLE_NAME.key());
if (!tableName.equals(tableName.toLowerCase())) {
- LOG.warn(
+ log.warn(
"Table name \"{}\" contains capital letters. Your metastore may
automatically convert this to lower case and can cause table not found errors
during subsequent syncs.", tableName);
}
}