This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit aacbdc7da5b2ab848623414304657220cd2bfa59 Author: Jon Vexler <[email protected]> AuthorDate: Fri May 10 20:47:33 2024 -0400 [HUDI-7731] Fix usage of new Configuration() in production code (#11191) Co-authored-by: Jonathan Vexler <=> --- .../main/java/org/apache/hudi/client/BaseHoodieClient.java | 2 +- .../apache/hudi/client/transaction/lock/LockManager.java | 2 +- .../client/transaction/lock/metrics/HoodieLockMetrics.java | 5 +++-- .../main/java/org/apache/hudi/metrics/HoodieMetrics.java | 5 +++-- .../table/action/compact/RunCompactionActionExecutor.java | 2 +- .../hudi/table/action/index/RunIndexActionExecutor.java | 2 +- .../org/apache/hudi/metrics/TestHoodieConsoleMetrics.java | 5 ++++- .../org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java | 5 ++++- .../java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java | 5 ++++- .../java/org/apache/hudi/metrics/TestHoodieMetrics.java | 5 ++++- .../hudi/metrics/datadog/TestDatadogMetricsReporter.java | 9 ++++++--- .../test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java | 10 +++++++--- .../hudi/metrics/prometheus/TestPrometheusReporter.java | 7 +++++-- .../hudi/metrics/prometheus/TestPushGateWayReporter.java | 13 ++++++++----- .../hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java | 2 +- .../hudi/metadata/JavaHoodieBackedTableMetadataWriter.java | 2 +- .../apache/hudi/client/TestJavaHoodieBackedMetadata.java | 2 +- .../hudi/client/validator/SparkPreCommitValidator.java | 2 +- .../hudi/metadata/SparkHoodieBackedTableMetadataWriter.java | 2 +- .../hudi/client/functional/TestHoodieBackedMetadata.java | 2 +- .../apache/hudi/common/table/log/HoodieLogFormatWriter.java | 2 +- .../hudi/common/table/log/block/HoodieAvroDataBlock.java | 3 ++- .../hudi/common/table/log/block/HoodieCommandBlock.java | 3 ++- .../hudi/common/table/log/block/HoodieCorruptBlock.java | 3 ++- .../apache/hudi/common/table/log/block/HoodieDataBlock.java | 7 ++++--- .../hudi/common/table/log/block/HoodieDeleteBlock.java | 3 ++- .../hudi/common/table/log/block/HoodieHFileDataBlock.java | 4 ++-- .../apache/hudi/common/table/log/block/HoodieLogBlock.java | 2 +- .../hudi/common/table/log/block/HoodieParquetDataBlock.java | 6 ++---- .../java/org/apache/hudi/metadata/BaseTableMetadata.java | 3 ++- .../org/apache/hudi/metadata/HoodieMetadataMetrics.java | 5 +++-- .../src/main/java/org/apache/hudi/metrics/Metrics.java | 12 +++++++----- .../apache/hudi/common/functional/TestHoodieLogFormat.java | 2 +- .../hudi/common/table/log/block/TestHoodieDeleteBlock.java | 3 ++- .../procedures/RepairOverwriteHoodiePropsProcedure.scala | 2 +- .../marker/MarkerBasedEarlyConflictDetectionRunnable.java | 6 ++---- .../utilities/deltastreamer/HoodieDeltaStreamerMetrics.java | 9 +++++---- .../hudi/utilities/ingestion/HoodieIngestionMetrics.java | 10 +++++++--- .../hudi/utilities/streamer/HoodieStreamerMetrics.java | 11 ++++++----- .../java/org/apache/hudi/utilities/streamer/StreamSync.java | 8 ++++++-- 40 files changed, 118 insertions(+), 75 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index d6963f891ff..46ab6bb85ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -98,7 +98,7 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { this.heartbeatClient = new HoodieHeartbeatClient(storage, this.basePath, clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses()); - this.metrics = new HoodieMetrics(config); + this.metrics = new HoodieMetrics(config, context.getStorageConf()); this.txnManager = new TransactionManager(config, storage); startEmbeddedServerView(); initWrapperFSMetrics(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index 9393e247565..4fcb79a588e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -66,7 +66,7 @@ public class LockManager implements Serializable, AutoCloseable { Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); - metrics = new HoodieLockMetrics(writeConfig); + metrics = new HoodieLockMetrics(writeConfig, storageConf); } public void lock() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java index bbf3d6876d8..7a793de5392 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java @@ -26,6 +26,7 @@ import com.codahale.metrics.Timer; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.storage.StorageConfiguration; import java.util.concurrent.TimeUnit; @@ -49,12 +50,12 @@ public class HoodieLockMetrics { private static final Object REGISTRY_LOCK = new Object(); private Metrics metrics; - public HoodieLockMetrics(HoodieWriteConfig writeConfig) { + public HoodieLockMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled(); this.writeConfig = writeConfig; if (isMetricsEnabled) { - metrics = Metrics.getInstance(writeConfig.getMetricsConfig()); + metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), storageConf); MetricRegistry registry = metrics.getRegistry(); lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 72df6b8ce9e..82dca3c43bb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.Counter; import com.codahale.metrics.Timer; @@ -88,11 +89,11 @@ public class HoodieMetrics { private Counter compactionRequestedCounter = null; private Counter compactionCompletedCounter = null; - public HoodieMetrics(HoodieWriteConfig config) { + public HoodieMetrics(HoodieWriteConfig config, StorageConfiguration<?> storageConf) { this.config = config; this.tableName = config.getTableName(); if (config.isMetricsOn()) { - metrics = Metrics.getInstance(config.getMetricsConfig()); + metrics = Metrics.getInstance(config.getMetricsConfig(), storageConf); this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION); this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION); this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 055cdb5910b..55e8ce7d23f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -73,7 +73,7 @@ public class RunCompactionActionExecutor<T> extends this.operationType = operationType; checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT, "Only COMPACT and LOG_COMPACT is supported"); - metrics = new HoodieMetrics(config); + metrics = new HoodieMetrics(config, context.getStorageConf()); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index dc5ad7e27de..c971ac10646 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -100,7 +100,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, super(context, config, table, instantTime); this.txnManager = new TransactionManager(config, table.getMetaClient().getStorage()); if (config.getMetadataConfig().isMetricsEnabled()) { - this.metrics = Option.of(new HoodieMetadataMetrics(config.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(config.getMetricsConfig(), context.getStorageConf())); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java index 43748e96833..4e938ef1cef 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java @@ -18,8 +18,10 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +42,7 @@ public class TestHoodieConsoleMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -49,7 +52,7 @@ public class TestHoodieConsoleMetrics { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java index 63a6704b02f..cf488405660 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieGraphiteMetrics.java @@ -18,9 +18,11 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -43,6 +45,7 @@ public class TestHoodieGraphiteMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -60,7 +63,7 @@ public class TestHoodieGraphiteMetrics { when(metricsConfig.getGraphiteServerPort()).thenReturn(NetworkTestUtils.nextFreePort()); when(metricsConfig.getGraphiteReportPeriodSeconds()).thenReturn(30); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); metrics.registerGauge("graphite_metric", 123L); assertEquals("123", metrics.getRegistry().getGauges() diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java index 3b776c104cd..9daebd08661 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java @@ -18,9 +18,11 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,6 +46,7 @@ public class TestHoodieJmxMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -55,7 +58,7 @@ public class TestHoodieJmxMetrics { when(metricsConfig.getJmxHost()).thenReturn("localhost"); when(metricsConfig.getJmxPort()).thenReturn(String.valueOf(NetworkTestUtils.nextFreePort())); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index 7b1b918535b..73b9646d577 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -19,11 +19,13 @@ package org.apache.hudi.metrics; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.Timer; import org.junit.jupiter.api.AfterEach; @@ -49,6 +51,7 @@ public class TestHoodieMetrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -58,7 +61,7 @@ public class TestHoodieMetrics { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.INMEMORY); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java index 55637a241e2..9a7b82b4485 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java @@ -24,6 +24,7 @@ import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.MetricRegistry; import org.junit.jupiter.api.AfterEach; @@ -47,6 +48,8 @@ public class TestDatadogMetricsReporter { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + @Mock + StorageConfiguration storageConf; HoodieMetrics hoodieMetrics; Metrics metrics; @@ -70,7 +73,7 @@ public class TestDatadogMetricsReporter { when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); Throwable t = assertThrows(IllegalStateException.class, () -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); assertEquals("Datadog cannot be initialized: API key is null or empty.", t.getMessage()); @@ -86,7 +89,7 @@ public class TestDatadogMetricsReporter { when(metricsConfig.getDatadogMetricPrefix()).thenReturn(""); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); Throwable t = assertThrows(IllegalStateException.class, () -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); assertEquals("Datadog cannot be initialized: Metric prefix is null or empty.", t.getMessage()); @@ -108,7 +111,7 @@ public class TestDatadogMetricsReporter { when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); assertDoesNotThrow(() -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java index 65c4b1d4aba..954619f6174 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java @@ -29,6 +29,8 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -42,6 +44,8 @@ public class TestM3Metrics { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + @Mock + StorageConfiguration storageConf; HoodieMetrics hoodieMetrics; Metrics metrics; @@ -62,7 +66,7 @@ public class TestM3Metrics { when(metricsConfig.getM3Service()).thenReturn("hoodie"); when(metricsConfig.getM3Tags()).thenReturn("tag1=value1,tag2=value2"); when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); metrics.registerGauge("metric1", 123L); assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); @@ -80,7 +84,7 @@ public class TestM3Metrics { when(metricsConfig.getM3Service()).thenReturn("hoodie"); when(metricsConfig.getM3Tags()).thenReturn(""); when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); metrics.registerGauge("metric1", 123L); assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); @@ -94,7 +98,7 @@ public class TestM3Metrics { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn(""); assertThrows(RuntimeException.class, () -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); }); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java index 9ad2b8388a2..d95614a577a 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPrometheusReporter.java @@ -18,11 +18,13 @@ package org.apache.hudi.metrics.prometheus; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -42,6 +44,7 @@ public class TestPrometheusReporter { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -60,8 +63,8 @@ public class TestPrometheusReporter { when(metricsConfig.getPrometheusPort()).thenReturn(9090); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); assertDoesNotThrow(() -> { - new HoodieMetrics(writeConfig); - hoodieMetrics = new HoodieMetrics(writeConfig); + new HoodieMetrics(writeConfig, storageConf); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java index aa1c3f06b6f..c2c7695932d 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/prometheus/TestPushGateWayReporter.java @@ -18,6 +18,7 @@ package org.apache.hudi.metrics.prometheus; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; @@ -25,6 +26,7 @@ import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.metrics.MetricUtils; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -34,15 +36,15 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.ArrayList; -import java.util.Map; -import java.util.UUID; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -60,6 +62,7 @@ public class TestPushGateWayReporter { HoodieWriteConfig writeConfig; @Mock HoodieMetricsConfig metricsConfig; + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); HoodieMetrics hoodieMetrics; Metrics metrics; @@ -78,7 +81,7 @@ public class TestPushGateWayReporter { configureDefaultReporter(); assertDoesNotThrow(() -> { - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); }); @@ -103,7 +106,7 @@ public class TestPushGateWayReporter { when(metricsConfig.getMetricReporterFileBasedConfigs()).thenReturn(propPrometheusPath + "," + propDatadogPath); } - hoodieMetrics = new HoodieMetrics(writeConfig); + hoodieMetrics = new HoodieMetrics(writeConfig, storageConf); metrics = hoodieMetrics.getMetrics(); Map<String, Long> metricsMap = new HashMap<>(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 2386beab02f..2ae017b85b4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -86,7 +86,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad protected void initRegistry() { if (metadataWriteConfig.isMetricsOn()) { // should support executor metrics - this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf)); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index 5f897ebecad..1c362c35e85 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -73,7 +73,7 @@ public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetada @Override protected void initRegistry() { if (metadataWriteConfig.isMetricsOn()) { - this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf)); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 8c7894e4cf6..8e62d640530 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -2376,7 +2376,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { assertNoWriteErrors(writeStatuses); validateMetadata(client); - Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig()); + Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), storageConf); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); assertTrue((Long) metrics.getRegistry().getGauges().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count").getValue() >= 1L); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java index 5288963e33b..25fae3cb6f5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java @@ -59,7 +59,7 @@ public abstract class SparkPreCommitValidator<T, I, K, O extends HoodieData<Writ this.table = table; this.engineContext = engineContext; this.writeConfig = writeConfig; - this.metrics = new HoodieMetrics(writeConfig); + this.metrics = new HoodieMetrics(writeConfig, engineContext.getStorageConf()); } protected Set<String> getPartitionsModified(HoodieWriteMetadata<O> writeResult) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index eba77604e99..8e73a52ab4c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -106,7 +106,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad } else { registry = Registry.getRegistry("HoodieMetadata"); } - this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig())); + this.metrics = Option.of(new HoodieMetadataMetrics(metadataWriteConfig.getMetricsConfig(), storageConf)); } else { this.metrics = Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 3d5a2651575..f2f689d1bd4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -3024,7 +3024,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertNoWriteErrors(writeStatuses); validateMetadata(client); - Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig()); + Metrics metrics = Metrics.getInstance(writeConfig.getMetricsConfig(), storageConf); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); assertTrue(metrics.getRegistry().getGauges().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); assertTrue((Long) metrics.getRegistry().getGauges().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count").getValue() >= 1L); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index 295d4a14073..7e10d5064f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -159,7 +159,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { // bytes for header byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader()); // content bytes - byte[] content = block.getContentBytes(); + byte[] content = block.getContentBytes(storage.getConf()); // bytes for footer byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 4153dd4c545..5a8e546734b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -98,7 +99,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { } @Override - protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException { + protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java index deeb903cd18..a519f80eb40 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.util.Option; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import java.util.HashMap; import java.util.Map; @@ -61,7 +62,7 @@ public class HoodieCommandBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) { return new byte[0]; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java index 19d704c2595..74502ee1b8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.util.Option; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import java.io.IOException; import java.util.Map; @@ -38,7 +39,7 @@ public class HoodieCorruptBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { if (!getContent().isPresent() && readBlockLazily) { // read content from disk inflate(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 22dfdd4e7ea..6d75ce40355 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; @@ -105,7 +106,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { // In case this method is called before realizing records from content Option<byte[]> content = getContent(); @@ -115,7 +116,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { return content.get(); } - return serializeRecords(records.get()); + return serializeRecords(records.get(), storageConf); } protected static Schema getWriterSchema(Map<HeaderMetadataType, String> logBlockHeader) { @@ -187,7 +188,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { ); } - protected abstract byte[] serializeRecords(List<HoodieRecord> records) throws IOException; + protected abstract byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException; protected abstract <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] content, HoodieRecordType type) throws IOException; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java index 1639b835ab6..aa4432ab7e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.SeekableDataInputStream; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.util.Lazy; import org.apache.avro.io.BinaryDecoder; @@ -87,7 +88,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { } @Override - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { Option<byte[]> content = getContent(); // In case this method is called before realizing keys from content diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index b875889e7b9..219fa2dc1c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -118,14 +118,14 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { } @Override - protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException { + protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { HFileContext context = new HFileContextBuilder() .withBlockSize(DEFAULT_BLOCK_SIZE) .withCompression(compressionAlgorithm.get()) .withCellComparator(ReflectionUtils.loadClass(KV_COMPARATOR_CLASS_NAME)) .build(); - Configuration conf = new Configuration(); + Configuration conf = storageConf.unwrapAs(Configuration.class); CacheConfig cacheConfig = new CacheConfig(conf); ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream ostream = new FSDataOutputStream(baos, null); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index ad07be8de7f..70a04d594d1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -85,7 +85,7 @@ public abstract class HoodieLogBlock { } // Return the bytes representation of the data belonging to a LogBlock - public byte[] getContentBytes() throws IOException { + public byte[] getContentBytes(StorageConfiguration<?> storageConf) throws IOException { throw new HoodieException("No implementation was provided"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index aca30456b17..28c025c9020 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -28,13 +28,11 @@ import org.apache.hudi.io.SeekableDataInputStream; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; -import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.inline.InLineFSUtils; import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -98,7 +96,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { } @Override - protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException { + protected byte[] serializeRecords(List<HoodieRecord> records, StorageConfiguration<?> storageConf) throws IOException { if (records.size() == 0) { return new byte[0]; } @@ -116,7 +114,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { HoodieFileWriter parquetWriter = null; try { parquetWriter = HoodieFileWriterFactory.getFileWriter( - HoodieFileFormat.PARQUET, outputStream, HoodieStorageUtils.getStorageConf(new Configuration()), + HoodieFileFormat.PARQUET, outputStream, storageConf, config, writerSchema, recordType); for (HoodieRecord<?> record : records) { String recordKey = getRecordKey(record).orElse(null); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index eed5c3a03b0..f9e8bf2b7c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -98,7 +98,8 @@ public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { this.isMetadataTableInitialized = dataMetaClient.getTableConfig().isMetadataTableAvailable(); if (metadataConfig.isMetricsEnabled()) { - this.metrics = Option.of(new HoodieMetadataMetrics(HoodieMetricsConfig.newBuilder().fromProperties(metadataConfig.getProps()).build())); + this.metrics = Option.of(new HoodieMetadataMetrics(HoodieMetricsConfig.newBuilder() + .fromProperties(metadataConfig.getProps()).build(), getStorageConf())); } else { this.metrics = Option.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java index 970ad0743f4..fce32753883 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metrics.HoodieGauge; import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.MetricRegistry; import org.slf4j.Logger; @@ -80,8 +81,8 @@ public class HoodieMetadataMetrics implements Serializable { private final transient MetricRegistry metricsRegistry; private final transient Metrics metrics; - public HoodieMetadataMetrics(HoodieMetricsConfig metricsConfig) { - this.metrics = Metrics.getInstance(metricsConfig); + public HoodieMetadataMetrics(HoodieMetricsConfig metricsConfig, StorageConfiguration<?> storageConf) { + this.metrics = Metrics.getInstance(metricsConfig, storageConf); this.metricsRegistry = metrics.getRegistry(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java index af32248eea1..cc50d3a4147 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -25,10 +25,10 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import com.codahale.metrics.MetricRegistry; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +53,10 @@ public class Metrics { private final String basePath; private boolean initialized = false; private transient Thread shutdownThread = null; + private final StorageConfiguration<?> storageConf; - public Metrics(HoodieMetricsConfig metricConfig) { + public Metrics(HoodieMetricsConfig metricConfig, StorageConfiguration<?> storageConf) { + this.storageConf = storageConf; registry = new MetricRegistry(); commonMetricPrefix = metricConfig.getMetricReporterMetricsNamePrefix(); reporters = new ArrayList<>(); @@ -78,13 +80,13 @@ public class Metrics { registerGauges(Registry.getAllMetrics(true, true), Option.of(commonMetricPrefix)); } - public static synchronized Metrics getInstance(HoodieMetricsConfig metricConfig) { + public static synchronized Metrics getInstance(HoodieMetricsConfig metricConfig, StorageConfiguration<?> storageConf) { String basePath = getBasePath(metricConfig); if (METRICS_INSTANCE_PER_BASEPATH.containsKey(basePath)) { return METRICS_INSTANCE_PER_BASEPATH.get(basePath); } - Metrics metrics = new Metrics(metricConfig); + Metrics metrics = new Metrics(metricConfig, storageConf); METRICS_INSTANCE_PER_BASEPATH.put(basePath, metrics); return metrics; } @@ -98,7 +100,7 @@ public class Metrics { private List<MetricsReporter> addAdditionalMetricsExporters(HoodieMetricsConfig metricConfig) { List<MetricsReporter> reporterList = new ArrayList<>(); List<String> propPathList = StringUtils.split(metricConfig.getMetricReporterFileBasedConfigs(), ","); - try (HoodieStorage storage = HoodieStorageUtils.getStorage(propPathList.get(0), new Configuration())) { + try (HoodieStorage storage = HoodieStorageUtils.getStorage(propPathList.get(0), storageConf)) { for (String propPath : propPathList) { HoodieMetricsConfig secondarySourceConfig = HoodieMetricsConfig.newBuilder().fromInputStream( storage.open(new StoragePath(propPath))).withPath(metricConfig.getBasePath()).build(); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 3713950eb2b..ef699cd4937 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -432,7 +432,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(); + byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header).getContentBytes(storage.getConf()); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation( HoodieTestUtils.getDefaultStorageConfWithDefaults(), null, 0, dataBlockContentBytes.length, 0); HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, Option.ofNullable(dataBlockContentBytes), false, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java similarity index 97% rename from hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java index ccba018e64f..2e46b93d4b5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.Test; @@ -117,7 +118,7 @@ public class TestHoodieDeleteBlock { public void testDeleteBlockWithValidation(DeleteRecord[] deleteRecords) throws IOException { HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords, new HashMap<>()); - byte[] contentBytes = deleteBlock.getContentBytes(); + byte[] contentBytes = deleteBlock.getContentBytes(HoodieTestUtils.getDefaultStorageConf()); HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock( Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(), new HashMap<>()); DeleteRecord[] deserializedDeleteRecords = deserializeDeleteBlock.getRecordsToDelete(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala index c7e3110b6cd..07b4992dbc8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala @@ -51,7 +51,7 @@ class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with ProcedureBu def outputType: StructType = OUTPUT_TYPE def loadNewProps(filePath: String, props: Properties):Unit = { - val fs = HadoopFSUtils.getFs(filePath, new Configuration()) + val fs = HadoopFSUtils.getFs(filePath, spark.sessionState.newHadoopConf()) val fis = fs.open(new Path(filePath)) props.load(fis) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java index 6509e8d7e0c..11213b56e26 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java @@ -25,12 +25,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.timeline.service.handlers.MarkerHandler; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +93,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable { List<StoragePath> instants = MarkerUtils.getAllMarkerDir(tempPath, storage); HoodieTableMetaClient metaClient = - HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConf(new Configuration())).setBasePath(basePath) + HoodieTableMetaClient.builder().setConf(storage.getConf().newInstance()).setBasePath(basePath) .setLoadActiveTimelineOnLoad(true).build(); HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -104,7 +102,7 @@ public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable { storage, basePath); Set<String> tableMarkers = candidate.stream().flatMap(instant -> { return MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(instant, storage, - new HoodieLocalEngineContext(HadoopFSUtils.getStorageConf(new Configuration())), 100) + new HoodieLocalEngineContext(storage.getConf().newInstance()), 100) .values().stream().flatMap(Collection::stream); }).collect(Collectors.toSet()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index cd7867edf3e..1dd008da237 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics; /** @@ -30,11 +31,11 @@ import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics; @Deprecated public class HoodieDeltaStreamerMetrics extends HoodieStreamerMetrics { - public HoodieDeltaStreamerMetrics(HoodieWriteConfig writeConfig) { - super(writeConfig.getMetricsConfig()); + public HoodieDeltaStreamerMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { + super(writeConfig.getMetricsConfig(), storageConf); } - public HoodieDeltaStreamerMetrics(HoodieMetricsConfig metricsConfig) { - super(metricsConfig); + public HoodieDeltaStreamerMetrics(HoodieMetricsConfig metricsConfig, StorageConfiguration<?> storageConf) { + super(metricsConfig, storageConf); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java index 3d07610993d..eb9b51aedb3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.ingestion; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; +import org.apache.hudi.storage.StorageConfiguration; import com.codahale.metrics.Timer; @@ -30,14 +31,17 @@ import java.io.Serializable; */ public abstract class HoodieIngestionMetrics implements Serializable { + protected final StorageConfiguration<?> storageConf; + protected final HoodieMetricsConfig writeConfig; - public HoodieIngestionMetrics(HoodieWriteConfig writeConfig) { - this(writeConfig.getMetricsConfig()); + public HoodieIngestionMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { + this(writeConfig.getMetricsConfig(), storageConf); } - public HoodieIngestionMetrics(HoodieMetricsConfig writeConfig) { + public HoodieIngestionMetrics(HoodieMetricsConfig writeConfig, StorageConfiguration<?> storageConf) { this.writeConfig = writeConfig; + this.storageConf = storageConf; } public abstract Timer.Context getOverallTimerContext(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java index fcbf431ed6f..ab1f72185a3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java @@ -22,6 +22,7 @@ package org.apache.hudi.utilities.streamer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import com.codahale.metrics.Timer; @@ -37,14 +38,14 @@ public class HoodieStreamerMetrics extends HoodieIngestionMetrics { private transient Timer hiveSyncTimer; private transient Timer metaSyncTimer; - public HoodieStreamerMetrics(HoodieWriteConfig writeConfig) { - this(writeConfig.getMetricsConfig()); + public HoodieStreamerMetrics(HoodieWriteConfig writeConfig, StorageConfiguration<?> storageConf) { + this(writeConfig.getMetricsConfig(), storageConf); } - public HoodieStreamerMetrics(HoodieMetricsConfig writeConfig) { - super(writeConfig); + public HoodieStreamerMetrics(HoodieMetricsConfig writeConfig, StorageConfiguration<?> storageConf) { + super(writeConfig, storageConf); if (writeConfig.isMetricsOn()) { - metrics = Metrics.getInstance(writeConfig); + metrics = Metrics.getInstance(writeConfig, storageConf); this.overallTimerName = getMetricsName("timer", "deltastreamer"); this.hiveSyncTimerName = getMetricsName("timer", "deltastreamerHiveSync"); this.metaSyncTimerName = getMetricsName("timer", "deltastreamerMetaSync"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 90f3a17c957..87712243bd7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -63,6 +63,7 @@ import org.apache.hudi.config.HoodieErrorTableConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetaSyncException; @@ -75,6 +76,7 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.util.SyncUtilHelpers; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -310,8 +312,10 @@ public class StreamSync implements Serializable, Closeable { this.conf = conf; HoodieWriteConfig hoodieWriteConfig = getHoodieClientConfig(); - this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(cfg.ingestionMetricsClass, hoodieWriteConfig.getMetricsConfig()); - this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig); + this.metrics = (HoodieIngestionMetrics) ReflectionUtils.loadClass(cfg.ingestionMetricsClass, + new Class<?>[] { HoodieMetricsConfig.class, StorageConfiguration.class}, + hoodieWriteConfig.getMetricsConfig(), storage.getConf()); + this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig, storage.getConf()); if (props.getBoolean(ERROR_TABLE_ENABLED.key(), ERROR_TABLE_ENABLED.defaultValue())) { this.errorTableWriter = ErrorTableUtils.getErrorTableWriter( cfg, sparkSession, props, hoodieSparkContext, storage);
