This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new b9fab0b Revert "[HUDI-455] Redo hudi-client log statements using SLF4J (#1145)" (#1181) b9fab0b is described below commit b9fab0b933315d88e059a07a0dcf2397d5b69d14 Author: hejinbiao123 <38057507+hejinbiao...@users.noreply.github.com> AuthorDate: Mon Jan 6 21:13:29 2020 +0800 Revert "[HUDI-455] Redo hudi-client log statements using SLF4J (#1145)" (#1181) This reverts commit e637d9ed26fea1a336f2fd6139cde0dd192c429d. --- hudi-client/pom.xml | 5 -- .../java/org/apache/hudi/AbstractHoodieClient.java | 6 +-- .../org/apache/hudi/CompactionAdminClient.java | 17 ++++--- .../java/org/apache/hudi/HoodieCleanClient.java | 16 +++---- .../java/org/apache/hudi/HoodieReadClient.java | 6 +-- .../java/org/apache/hudi/HoodieWriteClient.java | 56 +++++++++++----------- .../client/embedded/EmbeddedTimelineService.java | 10 ++-- .../hbase/DefaultHBaseQPSResourceAllocator.java | 10 ++-- .../org/apache/hudi/index/hbase/HBaseIndex.java | 42 ++++++++-------- .../org/apache/hudi/io/HoodieAppendHandle.java | 20 ++++---- .../java/org/apache/hudi/io/HoodieCleanHelper.java | 16 ++++--- .../org/apache/hudi/io/HoodieCommitArchiveLog.java | 22 ++++----- .../org/apache/hudi/io/HoodieCreateHandle.java | 17 +++---- .../org/apache/hudi/io/HoodieKeyLookupHandle.java | 19 ++++---- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 39 +++++++-------- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 10 ++-- .../io/compact/HoodieRealtimeTableCompactor.java | 28 +++++------ .../org/apache/hudi/metrics/HoodieMetrics.java | 17 ++++--- .../apache/hudi/metrics/JmxMetricsReporter.java | 6 +-- .../main/java/org/apache/hudi/metrics/Metrics.java | 6 +-- .../hudi/metrics/MetricsGraphiteReporter.java | 6 +-- .../hudi/metrics/MetricsReporterFactory.java | 8 ++-- .../apache/hudi/table/HoodieCopyOnWriteTable.java | 52 ++++++++++---------- .../apache/hudi/table/HoodieMergeOnReadTable.java | 27 ++++++----- .../java/org/apache/hudi/table/HoodieTable.java | 12 ++--- .../org/apache/hudi/table/RollbackExecutor.java | 14 +++--- 26 files changed, 245 insertions(+), 242 deletions(-) diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 66538e0..d350777 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -85,11 +85,6 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> <!-- Parquet --> <dependency> diff --git a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java index 8457b90..dd108be 100644 --- a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java @@ -26,9 +26,9 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -39,7 +39,7 @@ import java.io.Serializable; */ public abstract class AbstractHoodieClient implements Serializable, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieClient.class); + private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class); protected final transient FileSystem fs; protected final transient JavaSparkContext jsc; diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 00e0f75..56a47b7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -45,9 +45,9 @@ import org.apache.hudi.func.OperationResult; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; @@ -65,7 +65,7 @@ import static org.apache.hudi.common.table.HoodieTimeline.COMPACTION_ACTION; */ public class CompactionAdminClient extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class); + private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class); public CompactionAdminClient(JavaSparkContext jsc, String basePath) { super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build()); @@ -358,14 +358,13 @@ public class CompactionAdminClient extends AbstractHoodieClient { if (!dryRun) { return jsc.parallelize(renameActions, parallelism).map(lfPair -> { try { - LOG.info("RENAME {} => {}", lfPair.getLeft().getPath(), lfPair.getRight().getPath()); + LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); return new RenameOpResult(lfPair, true, Option.empty()); } catch (IOException e) { LOG.error("Error renaming log file", e); - LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. " - + "Try running \"compaction repair {} \" to recover from failure ***\n\n\n", - lfPair.getLeft().getBaseCommitTime()); + LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair " + + lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n"); return new RenameOpResult(lfPair, false, Option.of(e)); } }).collect(); @@ -396,7 +395,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); if (plan.getOperations() != null) { LOG.info( - "Number of Compaction Operations :{} for instant :{}", plan.getOperations().size(), compactionInstant); + "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); List<CompactionOperation> ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); return jsc.parallelize(ops, parallelism).flatMap(op -> { @@ -410,7 +409,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { } }).collect(); } - LOG.warn("No operations for compaction instant : {}", compactionInstant); + LOG.warn("No operations for compaction instant : " + compactionInstant); return new ArrayList<>(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java index 68503c6..9411782 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java @@ -39,16 +39,16 @@ import org.apache.hudi.table.HoodieTable; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class); private final transient HoodieMetrics metrics; public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { @@ -85,7 +85,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo // If there are inflight(failed) or previously requested clean operation, first perform them table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { - LOG.info("There were previously unfinished cleaner operations. Finishing Instant={}", hoodieInstant); + LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); runClean(table, hoodieInstant); }); @@ -122,7 +122,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo // Save to both aux and timeline folder try { table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan)); - LOG.info("Requesting Cleaning with instant time {}", cleanInstant); + LOG.info("Requesting Cleaning with instant time " + cleanInstant); } catch (IOException e) { LOG.error("Got exception when saving cleaner requested file", e); throw new HoodieIOException(e.getMessage(), e); @@ -173,20 +173,20 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo Option<Long> durationInMs = Option.empty(); if (context != null) { durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - LOG.info("cleanerElaspsedTime (Minutes): {}", durationInMs.get() / (1000 * 60)); + LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); } HoodieTableMetaClient metaClient = createMetaClient(true); // Create the metadata and save it HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats); - LOG.info("Cleaned {} files. Earliest Retained : {}", metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain()); + LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain()); metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); table.getActiveTimeline().transitionCleanInflightToComplete( new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()), AvroUtils.serializeCleanMetadata(metadata)); - LOG.info("Marked clean started on {} as complete", cleanInstant.getTimestamp()); + LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete"); return metadata; } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java index f309f40..3c4290c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java @@ -35,6 +35,8 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -49,8 +51,6 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -58,7 +58,7 @@ import scala.Tuple2; */ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieReadClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); /** * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 09e3f58..efb6d20 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -67,6 +67,8 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -84,8 +86,6 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -96,7 +96,7 @@ import scala.Tuple2; */ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class); private static final String UPDATE_STR = "update"; private static final String LOOKUP_STR = "lookup"; private final boolean rollbackPending; @@ -399,13 +399,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD, String actionType) { if (config.shouldAutoCommit()) { - LOG.info("Auto commit enabled: Committing {}", commitTime); + LOG.info("Auto commit enabled: Committing " + commitTime); boolean commitResult = commit(commitTime, resultRDD, Option.empty(), actionType); if (!commitResult) { throw new HoodieCommitException("Failed to commit " + commitTime); } } else { - LOG.info("Auto commit disabled for {}", commitTime); + LOG.info("Auto commit disabled for " + commitTime); } } @@ -454,13 +454,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) { preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); } else { - LOG.info("RDD PreppedRecords was persisted at: {}", preppedRecords.getStorageLevel()); + LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel()); } WorkloadProfile profile = null; if (hoodieTable.isWorkloadProfileNeeded()) { profile = new WorkloadProfile(preppedRecords); - LOG.info("Workload profile : {}", profile); + LOG.info("Workload profile :" + profile); saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime); } @@ -526,7 +526,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String actionType) { - LOG.info("Commiting {}", commitTime); + LOG.info("Commiting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); @@ -573,7 +573,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo metadata, actionType); writeContext = null; } - LOG.info("Committed {}", commitTime); + LOG.info("Committed " + commitTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime, e); @@ -607,7 +607,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp(); - LOG.info("Savepointing latest commit {}", latestCommit); + LOG.info("Savepointing latest commit " + latestCommit); return savepoint(latestCommit, user, comment); } @@ -658,7 +658,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo config.shouldAssumeDatePartitioning())) .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> { // Scan all partitions files with this commit time - LOG.info("Collecting latest files in partition path {}", partitionPath); + LOG.info("Collecting latest files in partition path " + partitionPath); ReadOptimizedView view = table.getROFileSystemView(); List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime) .map(HoodieDataFile::getFileName).collect(Collectors.toList()); @@ -672,7 +672,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo table.getActiveTimeline() .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime), AvroUtils.serializeSavepointMetadata(metadata)); - LOG.info("Savepoint {} created", commitTime); + LOG.info("Savepoint " + commitTime + " created"); return true; } catch (IOException e) { throw new HoodieSavepointException("Failed to savepoint " + commitTime, e); @@ -696,13 +696,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); if (!isSavepointPresent) { - LOG.warn("No savepoint present {}", savepointTime); + LOG.warn("No savepoint present " + savepointTime); return; } activeTimeline.revertToInflight(savePoint); activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime)); - LOG.info("Savepoint {} deleted", savepointTime); + LOG.info("Savepoint " + savepointTime + " deleted"); } /** @@ -730,7 +730,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } else { throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime); } - LOG.info("Compaction {} deleted", compactionTime); + LOG.info("Compaction " + compactionTime + " deleted"); } /** @@ -758,7 +758,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo List<String> commitsToRollback = commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - LOG.info("Rolling back commits {}", commitsToRollback); + LOG.info("Rolling back commits " + commitsToRollback); restoreToInstant(savepointTime); @@ -818,7 +818,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo // delete these files when it does not see a corresponding instant file under .hoodie List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant); instantsToStats.put(instant.getTimestamp(), statsForCompaction); - LOG.info("Deleted compaction instant {}", instant); + LOG.info("Deleted compaction instant " + instant); break; default: throw new IllegalArgumentException("invalid action name " + instant.getAction()); @@ -859,7 +859,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { // nothing to rollback - LOG.info("No commits to rollback {}", commitToRollback); + LOG.info("No commits to rollback " + commitToRollback); } // Make sure only the last n commits are being rolled back @@ -880,13 +880,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true); - LOG.info("Deleted inflight commits {}", commitToRollback); + LOG.info("Deleted inflight commits " + commitToRollback); // cleanup index entries if (!index.rollbackCommit(commitToRollback)) { throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); } - LOG.info("Index rolled back for commits {}", commitToRollback); + LOG.info("Index rolled back for commits " + commitToRollback); return stats; } @@ -907,7 +907,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo table.getActiveTimeline().saveAsComplete( new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), AvroUtils.serializeRollbackMetadata(rollbackMetadata)); - LOG.info("Commits {} rollback is complete", commitsToRollback); + LOG.info("Commits " + commitsToRollback + " rollback is complete"); if (!table.getActiveTimeline().getCleanerTimeline().empty()) { LOG.info("Cleaning up older rollback meta files"); @@ -935,7 +935,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats); table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime), AvroUtils.serializeRestoreMetadata(restoreMetadata)); - LOG.info("Commits {} rollback is complete. Restored dataset to {}", commitsToRollback, restoreToInstant); + LOG.info("Commits " + commitsToRollback + " rollback is complete. Restored dataset to " + restoreToInstant); if (!table.getActiveTimeline().getCleanerTimeline().empty()) { LOG.info("Cleaning up older restore meta files"); @@ -1027,7 +1027,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } private void startCommit(String instantTime) { - LOG.info("Generate a new instant time {}", instantTime); + LOG.info("Generate a new instant time " + instantTime); HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { @@ -1047,7 +1047,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo */ public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Generate a new instant time {}", instantTime); + LOG.info("Generate a new instant time " + instantTime); boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); return notEmpty ? Option.of(instantTime) : Option.empty(); } @@ -1291,9 +1291,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo + config.getBasePath() + " at time " + compactionCommitTime, e); } } - LOG.info("Compacted successfully on commit {}", compactionCommitTime); + LOG.info("Compacted successfully on commit " + compactionCommitTime); } else { - LOG.info("Compaction did not run for commit {}", compactionCommitTime); + LOG.info("Compaction did not run for commit " + compactionCommitTime); } } @@ -1304,7 +1304,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo if (finalizeCtx != null) { Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop())); durationInMs.ifPresent(duration -> { - LOG.info("Finalize write elapsed time (milliseconds): {}", duration); + LOG.info("Finalize write elapsed time (milliseconds): " + duration); metrics.updateFinalizeWriteMetrics(duration, stats.size()); }); } @@ -1344,7 +1344,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo m.forEach(metadata::addMetadata); }); - LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata); + LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); try { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index a958617..5afee3f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -26,9 +26,9 @@ import org.apache.hudi.common.util.NetworkUtils; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; @@ -37,7 +37,7 @@ import java.io.IOException; */ public class EmbeddedTimelineService { - private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class); + private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class); private int serverPort; private String hostAddr; @@ -72,13 +72,13 @@ public class EmbeddedTimelineService { public void startServer() throws IOException { server = new TimelineService(0, viewManager, hadoopConf.newCopy()); serverPort = server.startService(); - LOG.info("Started embedded timeline server at {} : {}", hostAddr, serverPort); + LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort); } private void setHostAddrFromSparkConf(SparkConf sparkConf) { String hostAddr = sparkConf.get("spark.driver.host", null); if (hostAddr != null) { - LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", hostAddr, this.hostAddr); + LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr); this.hostAddr = hostAddr; } else { LOG.warn("Unable to find driver bind address from spark config"); diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java index 4c8f9c4..e3a4904 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java @@ -20,12 +20,12 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.config.HoodieWriteConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAllocator { private HoodieWriteConfig hoodieWriteConfig; - private static final Logger LOG = LoggerFactory.getLogger(DefaultHBaseQPSResourceAllocator.class); + private static final Logger LOG = LogManager.getLogger(DefaultHBaseQPSResourceAllocator.class); public DefaultHBaseQPSResourceAllocator(HoodieWriteConfig hoodieWriteConfig) { this.hoodieWriteConfig = hoodieWriteConfig; @@ -46,7 +46,7 @@ public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAl @Override public void releaseQPSResources() { // Do nothing, as there are no resources locked in default implementation - LOG.info("Release QPS resources called for {} with default implementation, do nothing", - this.hoodieWriteConfig.getHbaseTableName()); + LOG.info(String.format("Release QPS resources called for %s with default implementation, do nothing", + this.hoodieWriteConfig.getHbaseTableName())); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 5308a10..3789bff 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -63,8 +65,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -83,7 +83,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); private static final int SLEEP_TIME_MILLISECONDS = 100; - private static final Logger LOG = LoggerFactory.getLogger(HBaseIndex.class); + private static final Logger LOG = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; private float qpsFraction; @@ -115,7 +115,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { @VisibleForTesting public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) { try { - LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass()); + LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass()); return (HBaseIndexQPSResourceAllocator) ReflectionUtils .loadClass(config.getHBaseQPSResourceAllocatorClass(), config); } catch (Exception e) { @@ -320,7 +320,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { doPutsAndDeletes(hTable, puts, deletes); } catch (Exception e) { Exception we = new Exception("Error updating index for " + writeStatus, e); - LOG.error("Error updating index for {}", writeStatus, e); + LOG.error(we); writeStatus.setGlobalError(we); } writeStatusList.add(writeStatus); @@ -370,7 +370,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { HoodieTable<T> hoodieTable) { final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); - LOG.info("multiPutBatchSize: before HBase puts {}", multiPutBatchSize); + LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); // caching the index updated status RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel()); @@ -398,15 +398,15 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { this.numRegionServersForTable = getNumRegionServersAliveForTable(); final float desiredQPSFraction = hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable); - LOG.info("Desired QPSFraction : {}", desiredQPSFraction); - LOG.info("Number HBase puts : {}", numPuts); - LOG.info("HBase Puts Parallelism : {}", hbasePutsParallelism); + LOG.info("Desired QPSFraction :" + desiredQPSFraction); + LOG.info("Number HBase puts :" + numPuts); + LOG.info("Hbase Puts Parallelism :" + hbasePutsParallelism); final float availableQpsFraction = hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction, numPuts); LOG.info("Allocated QPS Fraction :" + availableQpsFraction); multiPutBatchSize = putBatchSizeCalculator.getBatchSize(numRegionServersForTable, maxQpsPerRegionServer, hbasePutsParallelism, maxExecutors, SLEEP_TIME_MILLISECONDS, availableQpsFraction); - LOG.info("multiPutBatchSize : {}", multiPutBatchSize); + LOG.info("multiPutBatchSize :" + multiPutBatchSize); } } @@ -420,7 +420,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { public static class HbasePutBatchSizeCalculator implements Serializable { private static final int MILLI_SECONDS_IN_A_SECOND = 1000; - private static final Logger LOG = LoggerFactory.getLogger(HbasePutBatchSizeCalculator.class); + private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class); /** * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed @@ -462,15 +462,15 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors)); int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs; int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec)); - LOG.info("HBaseIndexThrottling: qpsFraction : {}", qpsFraction); - LOG.info("HBaseIndexThrottling: numRSAlive : {}", numRSAlive); - LOG.info("HBaseIndexThrottling: maxReqPerSec : {}", maxReqPerSec); - LOG.info("HBaseIndexThrottling: numTasks : {}", numTasks); - LOG.info("HBaseIndexThrottling: maxExecutors : {}", maxExecutors); - LOG.info("HBaseIndexThrottling: maxParallelPuts : {}", maxParallelPuts); - LOG.info("HBaseIndexThrottling: maxReqsSentPerTaskPerSec : {}", maxReqsSentPerTaskPerSec); - LOG.info("HBaseIndexThrottling: numRegionServersForTable : {}", numRegionServersForTable); - LOG.info("HBaseIndexThrottling: multiPutBatchSize : {}", multiPutBatchSize); + LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction); + LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive); + LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec); + LOG.info("HbaseIndexThrottling: numTasks :" + numTasks); + LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors); + LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts); + LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec); + LOG.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable); + LOG.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize); return multiPutBatchSize; } } @@ -485,7 +485,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { .toIntExact(regionLocator.getAllRegionLocations().stream().map(HRegionLocation::getServerName).distinct().count()); return numRegionServersForTable; } catch (IOException e) { - LOG.error("Error while connecting HBase:", e); + LOG.error(e); throw new RuntimeException(e); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 33e4417..edf01ce 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -47,10 +47,10 @@ import com.google.common.collect.Maps; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; import org.apache.spark.util.SizeEstimator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> { - private static final Logger LOG = LoggerFactory.getLogger(HoodieAppendHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class); // This acts as the sequenceID for records written private static AtomicLong recordIndex = new AtomicLong(1); private final String fileId; @@ -123,7 +123,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri } else { // This means there is no base data file, start appending to a new log file fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId)); - LOG.info("New InsertHandle for partition : {}", partitionPath); + LOG.info("New InsertHandle for partition :" + partitionPath); } writeStatus.getStat().setPrevCommit(baseInstantTime); writeStatus.setFileId(fileId); @@ -137,7 +137,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize()); } catch (Exception e) { - LOG.error("Error in update task at commit {}", instantTime, e); + LOG.error("Error in update task at commit " + instantTime, e); writeStatus.setGlobalError(e); throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit " + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e); @@ -179,7 +179,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri hoodieRecord.deflate(); return avroRecord; } catch (Exception e) { - LOG.error("Error writing record {}", hoodieRecord, e); + LOG.error("Error writing record " + hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return Option.empty(); @@ -232,7 +232,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri // Not throwing exception from here, since we don't want to fail the entire job // for a single record writeStatus.markFailure(record, t, recordMetadata); - LOG.error("Error writing record {}", record, t); + LOG.error("Error writing record " + record, t); } } @@ -259,8 +259,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri runtimeStats.setTotalUpsertTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); - LOG.info("AppendHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalUpsertTime()); + LOG.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalUpsertTime())); return writeStatus; } catch (IOException e) { @@ -308,7 +308,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) { // Recompute averageRecordSize before writing a new block and update existing value with // avg of new and old - LOG.info("AvgRecordSize => {}", averageRecordSize); + LOG.info("AvgRecordSize => " + averageRecordSize); averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2; doAppend(header); estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java index d75df4b..9c319c8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java @@ -40,8 +40,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; @@ -62,7 +62,7 @@ import java.util.stream.Collectors; */ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanHelper.class); + private static final Logger LOG = LogManager.getLogger(HoodieCleanHelper.class); private final SyncableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; @@ -101,7 +101,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri if ((cleanMetadata.getEarliestCommitToRetain() != null) && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " - + "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain); + + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + + ". New Instant to retain : " + newInstantToRetain); return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> { @@ -126,7 +127,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri * single file (i.e run it with versionsRetained = 1) */ private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) { - LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained()); + LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + + " file versions. "); List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List<String> deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints @@ -185,7 +187,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri */ private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) { int commitsRetained = config.getCleanerCommitsRetained(); - LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained); + LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List<String> deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints @@ -272,7 +274,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - LOG.info("{} patterns used to delete in partition path: {}", deletePaths.size(), partitionPath); + LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); return deletePaths; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java index 9baad75..bafbc8d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java @@ -54,9 +54,9 @@ import com.google.common.collect.Sets; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -72,7 +72,7 @@ import java.util.stream.Stream; */ public class HoodieCommitArchiveLog { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCommitArchiveLog.class); + private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class); private final Path archiveFilePath; private final HoodieTableMetaClient metaClient; @@ -119,9 +119,9 @@ public class HoodieCommitArchiveLog { boolean success = true; if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); - LOG.info("Archiving instants {}", instantsToArchive); + LOG.info("Archiving instants " + instantsToArchive); archive(instantsToArchive); - LOG.info("Deleting archived instants {}", instantsToArchive); + LOG.info("Deleting archived instants " + instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } else { LOG.info("No Instants to archive"); @@ -188,14 +188,14 @@ public class HoodieCommitArchiveLog { } private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException { - LOG.info("Deleting instants {}", archivedInstants); + LOG.info("Deleting instants " + archivedInstants); boolean success = true; for (HoodieInstant archivedInstant : archivedInstants) { Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); try { if (metaClient.getFs().exists(commitFile)) { success &= metaClient.getFs().delete(commitFile, false); - LOG.info("Archived and deleted instant file {}", commitFile); + LOG.info("Archived and deleted instant file " + commitFile); } } catch (IOException e) { throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); @@ -205,7 +205,7 @@ public class HoodieCommitArchiveLog { // Remove older meta-data from auxiliary path too Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); - LOG.info("Latest Committed Instant={}", latestCommitted); + LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); } @@ -233,7 +233,7 @@ public class HoodieCommitArchiveLog { Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); if (metaClient.getFs().exists(metaFile)) { success &= metaClient.getFs().delete(metaFile, false); - LOG.info("Deleted instant file in auxiliary metapath : {}", metaFile); + LOG.info("Deleted instant file in auxiliary metapath : " + metaFile); } } return success; @@ -243,7 +243,7 @@ public class HoodieCommitArchiveLog { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); - LOG.info("Wrapper schema {}", wrapperSchema.toString()); + LOG.info("Wrapper schema " + wrapperSchema.toString()); List<IndexedRecord> records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { @@ -252,7 +252,7 @@ public class HoodieCommitArchiveLog { writeToFile(wrapperSchema, records); } } catch (Exception e) { - LOG.error("Failed to archive commits, commit file: {}", hoodieInstant.getFileName(), e); + LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e); if (this.config.isFailOnTimelineArchivingEnabled()) { throw e; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index bece881..095e0a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -36,16 +36,16 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCreateHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); private final HoodieStorageWriter<IndexedRecord> storageWriter; private final Path path; @@ -73,7 +73,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } - LOG.info("New CreateHandle for partition : {} with fileId {}", partitionPath, fileId); + LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId); } /** @@ -120,7 +120,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri // Not throwing exception from here, since we don't want to fail the entire job // for a single record writeStatus.markFailure(record, t, recordMetadata); - LOG.error("Error writing record {}", record, t); + LOG.error("Error writing record " + record, t); } } @@ -152,7 +152,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri */ @Override public WriteStatus close() { - LOG.info("Closing the file {} as we are done with all the records {}", writeStatus.getFileId(), recordsWritten); + LOG + .info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { storageWriter.close(); @@ -174,8 +175,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri stat.setRuntimeStats(runtimeStats); writeStatus.setStat(stat); - LOG.info("CreateHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalCreateTime()); + LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalCreateTime())); return writeStatus; } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index 45472bc..9f3bdbb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -31,8 +31,8 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.HashSet; @@ -44,7 +44,7 @@ import java.util.Set; */ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> { - private static final Logger LOG = LoggerFactory.getLogger(HoodieKeyLookupHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class); private final HoodieTableType tableType; @@ -63,7 +63,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie HoodieTimer timer = new HoodieTimer().startTimer(); this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(), new Path(getLatestDataFile().getPath())); - LOG.info("Read bloom filter from {} in {} ms", partitionPathFilePair, timer.endTimer()); + LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer())); } /** @@ -82,7 +82,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size())); if (LOG.isDebugEnabled()) { - LOG.debug("Keys matching for file {} => {}", filePath, foundRecordKeys); + LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys); } } } catch (Exception e) { @@ -98,7 +98,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie // check record key against bloom filter of current file & add to possible keys if needed if (bloomFilter.mightContain(recordKey)) { if (LOG.isDebugEnabled()) { - LOG.debug("Record key {} matches bloom filter in {}", recordKey, partitionPathFilePair); + LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFilePair); } candidateRecordKeys.add(recordKey); } @@ -110,14 +110,15 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie */ public KeyLookupResult getLookupResult() { if (LOG.isDebugEnabled()) { - LOG.debug("#The candidate row keys for {} => {}", partitionPathFilePair, candidateRecordKeys); + LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys); } HoodieDataFile dataFile = getLatestDataFile(); List<String> matchingKeys = checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath())); - LOG.info("Total records ({}), bloom filter candidates ({})/fp({}), actual matches ({})", totalKeysChecked, - candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()); + LOG.info( + String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked, + candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size())); return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(), dataFile.getCommitTime(), matchingKeys); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 0c801e7..518b883 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -43,9 +43,9 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashSet; @@ -56,7 +56,7 @@ import java.util.Set; @SuppressWarnings("Duplicates") public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> { - private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); private Map<String, HoodieRecord<T>> keyToNewRecords; private Set<String> writtenRecordKeys; @@ -137,7 +137,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit if (exception.isPresent() && exception.get() instanceof Throwable) { // Not throwing exception from here, since we don't want to fail the entire job for a single record writeStatus.markFailure(record, exception.get(), recordMetadata); - LOG.error("Error writing record {}", record, exception.get()); + LOG.error("Error writing record " + record, exception.get()); } else { write(record, avroRecord); } @@ -155,7 +155,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit * Extract old file path, initialize StorageWriter and WriteStatus. */ private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) { - LOG.info("partitionPath: {}, fileId to be merged: {}", partitionPath, fileId); + LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId); this.writtenRecordKeys = new HashSet<>(); writeStatus.setStat(new HoodieWriteStat()); try { @@ -171,7 +171,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); - LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath.toString(), newFilePath.toString()); + LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), + newFilePath.toString())); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); @@ -186,7 +187,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit storageWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema); } catch (IOException io) { - LOG.error("Error in update task at commit {}", instantTime, io); + LOG.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + instantTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io); @@ -200,7 +201,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit try { // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); - LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge); + LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { @@ -217,10 +218,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist keyToNewRecords.put(record.getRecordKey(), record); } - LOG.info("Number of entries in MemoryBasedMap => {}. Total size in bytes of MemoryBasedMap => {}. " - + "Number of entries in DiskBasedMap => {}. Size of file spilled to disk => {}", - ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(), - ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); + LOG.info("Number of entries in MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + + "Total size in bytes of MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); return partitionPath; } @@ -250,7 +253,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit hoodieRecord.deflate(); return true; } catch (Exception e) { - LOG.error("Error writing record {}", hoodieRecord, e); + LOG.error("Error writing record " + hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return false; @@ -292,12 +295,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit try { storageWriter.writeAvro(key, oldRecord); } catch (ClassCastException e) { - LOG.error("Schema mismatch when rewriting old record {} from file {} to file {} with writerSchema {}", - oldRecord, getOldFilePath(), newFilePath, writerSchema.toString(true)); + LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath() + + " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true)); throw new HoodieUpsertException(errMsg, e); } catch (IOException e) { - LOG.error("Failed to merge old record into new file for key {} from old file {} to new file {}", - key, getOldFilePath(), newFilePath, e); + LOG.error("Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath() + + " to new file " + newFilePath, e); throw new HoodieUpsertException(errMsg, e); } recordsWritten++; @@ -342,8 +345,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); - LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalUpsertTime()); return writeStatus; } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 50256bc..7a1939a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -36,9 +36,9 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; @@ -47,7 +47,7 @@ import java.io.IOException; */ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends HoodieIOHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class); protected final Schema originalSchema; protected final Schema writerSchema; protected HoodieTimer timer; @@ -97,7 +97,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H protected void createMarkerFile(String partitionPath) { Path markerPath = makeNewMarkerPath(partitionPath); try { - LOG.info("Creating Marker Path={}", markerPath); + LOG.info("Creating Marker Path=" + markerPath); fs.create(markerPath, false).close(); } catch (IOException e) { throw new HoodieException("Failed to create marker file " + markerPath, e); @@ -147,7 +147,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H if (exception.isPresent() && exception.get() instanceof Throwable) { // Not throwing exception from here, since we don't want to fail the entire job for a single record writeStatus.markFailure(record, exception.get(), recordMetadata); - LOG.error("Error writing record {}", record, exception.get()); + LOG.error("Error writing record " + record, exception.get()); } else { write(record, avroRecord); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java index 1d02d4e..6f97601 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java @@ -47,13 +47,13 @@ import com.google.common.collect.Sets; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.util.AccumulatorV2; import org.apache.spark.util.LongAccumulator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -74,7 +74,7 @@ import static java.util.stream.Collectors.toList; */ public class HoodieRealtimeTableCompactor implements HoodieCompactor { - private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeTableCompactor.class); + private static final Logger LOG = LogManager.getLogger(HoodieRealtimeTableCompactor.class); // Accumulator to keep track of total log files for a dataset private AccumulatorV2<Long, Long> totalLogFiles; // Accumulator to keep track of total log file slices for a dataset @@ -92,7 +92,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); List<CompactionOperation> operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("Compactor compacting {} files", operations); + LOG.info("Compactor compacting " + operations + " files"); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); @@ -103,8 +103,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { FileSystem fs = metaClient.getFs(); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - LOG.info("Compacting base {} with delta files {} for commit {}", - operation.getDataFileName(), operation.getDeltaFileNames(), commitTime); + LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + + " for commit " + commitTime); // TODO - FIX THIS // Reads the entire avro file. Always only specific blocks should be read from the avro file // (failure recover). @@ -115,7 +115,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); - LOG.info("MaxMemoryPerCompaction => {}", config.getMaxMemoryPerCompaction()); + LOG.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); List<String> logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) @@ -176,7 +176,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // TODO : check if maxMemory is not greater than JVM or spark.executor memory // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - LOG.info("Compacting {} with commit {}", metaClient.getBasePath(), compactionCommitTime); + LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); @@ -189,7 +189,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { } RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); - LOG.info("Compaction looking for files to compact in {} partitions", partitionPaths); + LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) @@ -206,10 +206,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); - LOG.info("Total of {} compactions are retrieved", operations.size()); - LOG.info("Total number of latest files slices {}", totalFileSlices.value()); - LOG.info("Total number of log files {}", totalLogFiles.value()); - LOG.info("Total number of file slices {}", totalFileSlices.value()); + LOG.info("Total of " + operations.size() + " compactions are retrieved"); + LOG.info("Total number of latest files slices " + totalFileSlices.value()); + LOG.info("Total number of log files " + totalLogFiles.value()); + LOG.info("Total number of file slices " + totalFileSlices.value()); // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, @@ -221,7 +221,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { - LOG.warn("After filtering, Nothing to compact for {}", metaClient.getBasePath()); + LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); } return compactionPlan; } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index c0dd905..b6fcd09 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -24,15 +24,15 @@ import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; /** * Wrapper for metrics-related operations. */ public class HoodieMetrics { - private static final Logger LOG = LoggerFactory.getLogger(HoodieMetrics.class); + private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class); // Some timers public String rollbackTimerName = null; public String cleanTimerName = null; @@ -155,7 +155,8 @@ public class HoodieMetrics { public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { - LOG.info("Sending rollback metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted); + LOG.info( + String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted); } @@ -163,7 +164,8 @@ public class HoodieMetrics { public void updateCleanMetrics(long durationInMs, int numFilesDeleted) { if (config.isMetricsOn()) { - LOG.info("Sending clean metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted); + LOG.info( + String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted); } @@ -171,7 +173,8 @@ public class HoodieMetrics { public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) { if (config.isMetricsOn()) { - LOG.info("Sending finalize write metrics (duration={}, numFilesFinalized={})", durationInMs, numFilesFinalized); + LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs, + numFilesFinalized)); Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized); } @@ -179,7 +182,7 @@ public class HoodieMetrics { public void updateIndexMetrics(final String action, final long durationInMs) { if (config.isMetricsOn()) { - LOG.info("Sending index metrics ({}.duration, {})", action, durationInMs); + LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs)); Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java index a3e95fe..2559a4b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java @@ -22,8 +22,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; @@ -38,7 +38,7 @@ import java.rmi.registry.LocateRegistry; */ public class JmxMetricsReporter extends MetricsReporter { - private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class); + private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class); private final JMXConnectorServer connector; public JmxMetricsReporter(HoodieWriteConfig config) { diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java index 4d00684..4b19441 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieException; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.google.common.io.Closeables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.Closeable; @@ -33,7 +33,7 @@ import java.io.Closeable; * This is the main class of the metrics system. */ public class Metrics { - private static final Logger LOG = LoggerFactory.getLogger(Metrics.class); + private static final Logger LOG = LogManager.getLogger(Metrics.class); private static volatile boolean initialized = false; private static Metrics metrics = null; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java index bb33a97..aac6c70 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java @@ -24,8 +24,8 @@ import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.graphite.Graphite; import com.codahale.metrics.graphite.GraphiteReporter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.Closeable; import java.net.InetSocketAddress; @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; */ public class MetricsGraphiteReporter extends MetricsReporter { - private static final Logger LOG = LoggerFactory.getLogger(MetricsGraphiteReporter.class); + private static final Logger LOG = LogManager.getLogger(MetricsGraphiteReporter.class); private final MetricRegistry registry; private final GraphiteReporter graphiteReporter; private final HoodieWriteConfig config; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index a80c1ef..b9d433d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -21,15 +21,15 @@ package org.apache.hudi.metrics; import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.MetricRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; /** * Factory class for creating MetricsReporter. */ public class MetricsReporterFactory { - private static final Logger LOG = LoggerFactory.getLogger(MetricsReporterFactory.class); + private static final Logger LOG = LogManager.getLogger(MetricsReporterFactory.class); public static MetricsReporter createReporter(HoodieWriteConfig config, MetricRegistry registry) { MetricsReporterType type = config.getMetricsReporterType(); @@ -45,7 +45,7 @@ public class MetricsReporterFactory { reporter = new JmxMetricsReporter(config); break; default: - LOG.error("Reporter type[{}] is not supported.", type); + LOG.error("Reporter type[" + type + "] is not supported."); break; } return reporter; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 1ccf026..f1f277b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -58,6 +58,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; @@ -79,8 +81,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -92,7 +92,7 @@ import scala.Tuple2; */ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieTable<T> { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCopyOnWriteTable.class); + private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class); public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) { super(config, jsc); @@ -130,7 +130,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi try { boolean deleteResult = fs.delete(deletePath, false); if (deleteResult) { - LOG.debug("Cleaned file at path : {}", deletePath); + LOG.debug("Cleaned file at path :" + deletePath); } return deleteResult; } catch (FileNotFoundException fio) { @@ -172,7 +172,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi throws IOException { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { - LOG.info("Empty partition with fileId => {}", fileId); + LOG.info("Empty partition with fileId => " + fileId); return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator(); } // these are updates @@ -212,8 +212,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi // TODO(vc): This needs to be revisited if (upsertHandle.getWriteStatus().getPartitionPath() == null) { - LOG.info("Upsert Handle has partition path as null {}, {}", upsertHandle.getOldFilePath(), - upsertHandle.getWriteStatus()); + LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + + upsertHandle.getWriteStatus()); } return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); } @@ -291,7 +291,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi LOG.info("Nothing to clean here. It is already clean"); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } - LOG.info("Total Partitions to clean : {}, with policy {}", partitionsToClean.size(), config.getCleanerPolicy()); + LOG.info( + "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); @@ -317,7 +318,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi int cleanerParallelism = Math.min( (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), config.getCleanerParallelism()); - LOG.info("Using cleanerParallelism: {}", cleanerParallelism); + LOG.info("Using cleanerParallelism: " + cleanerParallelism); List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) @@ -353,7 +354,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); if (instant.isCompleted()) { - LOG.info("Unpublishing instant {}", instant); + LOG.info("Unpublishing instant " + instant); instant = activeTimeline.revertToInflight(instant); } @@ -363,7 +364,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi String commit = instant.getTimestamp(); // delete all the data files for this commit - LOG.info("Clean out all parquet files generated for commit: {}", commit); + LOG.info("Clean out all parquet files generated for commit: " + commit); List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant); //TODO: We need to persist this as rollback workload and use it in case of partial failures @@ -371,7 +372,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi } // Delete Inflight instant if enabled deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant); - LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime)); + LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return stats; } @@ -397,7 +398,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi // Remove the rolled back inflight commits if (deleteInstant) { - LOG.info("Deleting instant={}", instantToBeDeleted); + LOG.info("Deleting instant=" + instantToBeDeleted); activeTimeline.deletePending(instantToBeDeleted); if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) { // Delete corresponding requested instant @@ -405,9 +406,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi instantToBeDeleted.getTimestamp()); activeTimeline.deletePending(instantToBeDeleted); } - LOG.info("Deleted pending commit {}", instantToBeDeleted); + LOG.info("Deleted pending commit " + instantToBeDeleted); } else { - LOG.warn("Rollback finished without deleting inflight instant file. Instant={}", instantToBeDeleted); + LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); } } @@ -575,10 +576,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi assignUpdates(profile); assignInserts(profile); - LOG.info("Total Buckets :{}, buckets info => {}, \n" - + "Partition to insert buckets => {}, \n" - + "UpdateLocations mapped to buckets =>{}", - totalBuckets, bucketInfoMap, partitionPathToInsertBuckets, updateLocationToBucket); + LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" + + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" + + "UpdateLocations mapped to buckets =>" + updateLocationToBucket); } private void assignUpdates(WorkloadProfile profile) { @@ -606,13 +606,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); - LOG.info("AvgRecordSize => {}", averageRecordSize); + LOG.info("AvgRecordSize => " + averageRecordSize); for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { List<SmallFile> smallFiles = getSmallFiles(partitionPath); - LOG.info("For partitionPath : {} Small Files => {}", partitionPath, smallFiles); + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); long totalUnassignedInserts = pStat.getNumInserts(); List<Integer> bucketNumbers = new ArrayList<>(); @@ -627,10 +627,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); - LOG.info("Assigning {} inserts to existing update bucket {}", recordsToAppend, bucket); + LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { bucket = addUpdateBucket(smallFile.location.getFileId()); - LOG.info("Assigning {} inserts to new update bucket {}", recordsToAppend, bucket); + LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); @@ -646,8 +646,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi } int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); - LOG.info("After small file assignment: unassignedInserts => {}, totalInsertBuckets => {}, " - + "recordsPerBucket => {}", totalUnassignedInserts, insertBuckets, insertRecordsPerBucket); + LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts + + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); recordsPerBucket.add(totalUnassignedInserts / insertBuckets); @@ -667,7 +667,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); insertBuckets.add(bkt); } - LOG.info("Total insert buckets for partition path {} => {}", partitionPath, insertBuckets); + LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); partitionPathToInsertBuckets.put(partitionPath, insertBuckets); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 8845772..a654fcb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -44,11 +44,11 @@ import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; @@ -77,7 +77,7 @@ import java.util.stream.Collectors; */ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieCopyOnWriteTable<T> { - private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadTable.class); + private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class); // UpsertPartitioner for MergeOnRead table type private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner; @@ -98,10 +98,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi @Override public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException { - LOG.info("Merging updates for commit {} for file {}", commitTime, fileId); + LOG.info("Merging updates for commit " + commitTime + " for file " + fileId); if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { - LOG.info("Small file corrections for updates for commit {} for file {}", commitTime, fileId); + LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId); return super.handleUpdate(commitTime, fileId, recordItr); } else { HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); @@ -124,7 +124,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi @Override public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { - LOG.info("Checking if compaction needs to be run on {}", config.getBasePath()); + LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); Option<HoodieInstant> lastCompaction = getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); String deltaCommitsSinceTs = "0"; @@ -135,12 +135,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { - LOG.info("Not running compaction as only {} delta commits was found since last compaction {}. Waiting for {}", - deltaCommitsSinceLastCompaction, deltaCommitsSinceTs, config.getInlineCompactDeltaCommitMax()); + LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + + config.getInlineCompactDeltaCommitMax()); return new HoodieCompactionPlan(); } - LOG.info("Compacting merge on read table {}", config.getBasePath()); + LOG.info("Compacting merge on read table " + config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { return compactor.generateCompactionPlan(jsc, this, config, instantTime, @@ -170,11 +171,11 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi long startTime = System.currentTimeMillis(); String commit = instant.getTimestamp(); - LOG.error("Rolling back instant {}", instant); + LOG.error("Rolling back instant " + instant); // Atomically un-publish all non-inflight commits if (instant.isCompleted()) { - LOG.error("Un-publishing instant {}, deleteInstants={}", instant, deleteInstants); + LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants); instant = this.getActiveTimeline().revertToInflight(instant); } @@ -190,7 +191,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi // For Requested State (like failure during index lookup), there is nothing to do rollback other than // deleting the timeline file if (!instant.isRequested()) { - LOG.info("Unpublished {}", commit); + LOG.info("Unpublished " + commit); List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant); // TODO: We need to persist this as rollback workload and use it in case of partial failures allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests); @@ -199,7 +200,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi // Delete Inflight instants if enabled deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant); - LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime)); + LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return allRollbackStats; } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 5bb0ffa..d2f5715 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -52,11 +52,11 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -73,7 +73,7 @@ import java.util.stream.Stream; */ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieTable.class); + private static final Logger LOG = LogManager.getLogger(HoodieTable.class); protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; @@ -324,7 +324,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs)); if (fs.exists(markerDir)) { // For append only case, we do not write to marker dir. Hence, the above check - LOG.info("Removing marker directory={}", markerDir); + LOG.info("Removing marker directory=" + markerDir); fs.delete(markerDir, true); } } catch (IOException ioe) { @@ -363,7 +363,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri invalidDataPaths.removeAll(validDataPaths); if (!invalidDataPaths.isEmpty()) { LOG.info( - "Removing duplicate data files created due to spark retries before committing. Paths={}", invalidDataPaths); + "Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); } Map<String, List<Pair<String, String>>> groupByPartition = invalidDataPaths.stream() @@ -381,7 +381,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism()) .map(partitionWithFileList -> { final FileSystem fileSystem = metaClient.getFs(); - LOG.info("Deleting invalid data files={}", partitionWithFileList); + LOG.info("Deleting invalid data files=" + partitionWithFileList); if (partitionWithFileList.isEmpty()) { return true; } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java index 236f440..0f3297c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java @@ -36,6 +36,8 @@ import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; @@ -46,8 +48,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -55,7 +55,7 @@ import scala.Tuple2; */ public class RollbackExecutor implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(RollbackExecutor.class); + private static final Logger LOG = LogManager.getLogger(RollbackExecutor.class); private final HoodieTableMetaClient metaClient; private final HoodieWriteConfig config; @@ -179,13 +179,13 @@ public class RollbackExecutor implements Serializable { */ private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException { - LOG.info("Cleaning path {}", partitionPath); + LOG.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); - LOG.info("Delete file {} \t {}", file.getPath(), success); + LOG.info("Delete file " + file.getPath() + "\t" + success); } return results; } @@ -195,7 +195,7 @@ public class RollbackExecutor implements Serializable { */ private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException { - LOG.info("Cleaning path {}", partitionPath); + LOG.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); PathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { @@ -208,7 +208,7 @@ public class RollbackExecutor implements Serializable { for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); - LOG.info("Delete file {} \t {}", file.getPath(), success); + LOG.info("Delete file " + file.getPath() + "\t" + success); } return results; }