This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 11132dbff180b0bf3a78b0f0e0d7f82c28757312 Author: Vinish Reddy <[email protected]> AuthorDate: Tue May 14 13:51:09 2024 +0530 [HUDI-7535] Add metrics for sourceParallelism and Refresh profile in S3/GCS (#10918) Co-authored-by: Y Ethan Guo <[email protected]> --- .../ingestion/HoodieIngestionMetrics.java | 4 ++++ .../sources/GcsEventsHoodieIncrSource.java | 24 ++++++++++++++-------- .../apache/hudi/utilities/sources/KafkaSource.java | 5 ++++- .../sources/S3EventsHoodieIncrSource.java | 11 ++++++---- .../sources/helpers/CloudDataFetcher.java | 12 ++++++++--- .../utilities/streamer/HoodieStreamerMetrics.java | 13 ++++++++++++ .../utilities/sources/BaseTestKafkaSource.java | 4 ++++ .../sources/TestGcsEventsHoodieIncrSource.java | 15 +++++++++----- .../sources/TestS3EventsHoodieIncrSource.java | 17 ++++++++------- 9 files changed, 76 insertions(+), 29 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java index eb9b51aedb3..378ba45e3e9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java @@ -62,5 +62,9 @@ public abstract class HoodieIngestionMetrics implements Serializable { public abstract void updateStreamerSourceNewMessageCount(String sourceMetricName, long sourceNewMessageCount); + public abstract void updateStreamerSourceParallelism(int sourceParallelism); + + public abstract void updateStreamerSourceBytesToBeIngestedInSyncRound(long sourceBytesToBeIngested); + public abstract void shutdown(); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 5900ddade24..7ab8894b315 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; @@ -112,24 +113,29 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private final QueryRunner queryRunner; private final Option<SchemaProvider> schemaProvider; private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; - private static final Logger LOG = LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class); - public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, - SchemaProvider schemaProvider) { - + public GcsEventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext jsc, + SparkSession spark, + SchemaProvider schemaProvider, + HoodieIngestionMetrics metrics) { this(props, jsc, spark, - new CloudDataFetcher(props, jsc, spark), + new CloudDataFetcher(props, jsc, spark, metrics), new QueryRunner(spark, props), new DefaultStreamContext(schemaProvider, Option.empty()) ); } - public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, - StreamContext streamContext) { - + public GcsEventsHoodieIncrSource( + TypedProperties props, + JavaSparkContext jsc, + SparkSession spark, + HoodieIngestionMetrics metrics, + StreamContext streamContext) { this(props, jsc, spark, - new CloudDataFetcher(props, jsc, spark), + new CloudDataFetcher(props, jsc, spark, metrics), new QueryRunner(spark, props), streamContext ); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 99af1ab0086..6666ed76904 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -84,11 +84,14 @@ public abstract class KafkaSource<T> extends Source<T> { SourceProfile<Long> kafkaSourceProfile = sourceProfileSupplier.get().getSourceProfile(); offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getSourcePartitions(), metrics); + metrics.updateStreamerSourceParallelism(kafkaSourceProfile.getSourcePartitions()); + metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(kafkaSourceProfile.getMaxSourceBytes()); LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {} partitions from Kafka for topic {} with offsetRanges {}", kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getMaxSourceBytes(), kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), offsetRanges); } else { - long minPartitions = getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS); + int minPartitions = (int) getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS); + metrics.updateStreamerSourceParallelism(minPartitions); offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); LOG.info("About to read sourceLimit {} in {} spark partitions from kafka for topic {} with offset ranges {}", sourceLimit, minPartitions, offsetGen.getTopicName(), diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 579bc5c2021..ab8c0a55bbd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; @@ -72,21 +73,23 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { + SchemaProvider schemaProvider, + HoodieIngestionMetrics metrics) { this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), - new CloudDataFetcher(props, sparkContext, sparkSession), new DefaultStreamContext(schemaProvider, Option.empty())); + new CloudDataFetcher(props, sparkContext, sparkSession, metrics), new DefaultStreamContext(schemaProvider, Option.empty())); } public S3EventsHoodieIncrSource( TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + HoodieIngestionMetrics metrics, StreamContext streamContext) { this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, props), - new CloudDataFetcher(props, sparkContext, sparkSession), streamContext); + new CloudDataFetcher(props, sparkContext, sparkSession, metrics), streamContext); } - public S3EventsHoodieIncrSource( + S3EventsHoodieIncrSource( TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java index 06fb89da9a4..7fd656adb7e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.streamer.SourceProfileSupplier; @@ -60,14 +61,17 @@ public class CloudDataFetcher implements Serializable { private static final long serialVersionUID = 1L; - public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession) { - this(props, jsc, sparkSession, new CloudObjectsSelectorCommon(props)); + private final HoodieIngestionMetrics metrics; + + public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession, HoodieIngestionMetrics metrics) { + this(props, jsc, sparkSession, metrics, new CloudObjectsSelectorCommon(props)); } - public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession, CloudObjectsSelectorCommon cloudObjectsSelectorCommon) { + public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, SparkSession sparkSession, HoodieIngestionMetrics metrics, CloudObjectsSelectorCommon cloudObjectsSelectorCommon) { this.props = props; this.sparkContext = jsc; this.sparkSession = sparkSession; + this.metrics = metrics; this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon; } @@ -131,7 +135,9 @@ public class CloudDataFetcher implements Serializable { } // inflate 10% for potential hoodie meta fields double totalSizeWithHoodieMetaFields = totalSize * 1.1; + metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(totalSize); int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields / bytesPerPartition), 1); + metrics.updateStreamerSourceParallelism(numPartitions); return cloudObjectsSelectorCommon.loadAsDataset(sparkSession, cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java index ab1f72185a3..c5c01bee231 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java @@ -158,6 +158,19 @@ public class HoodieStreamerMetrics extends HoodieIngestionMetrics { } } + @Override + public void updateStreamerSourceParallelism(int sourceParallelism) { + if (writeConfig.isMetricsOn()) { + metrics.registerGauge(getMetricsName("deltastreamer", "sourceParallelism"), sourceParallelism); + } + } + + public void updateStreamerSourceBytesToBeIngestedInSyncRound(long sourceBytesToBeIngested) { + if (writeConfig.isMetricsOn()) { + metrics.registerGauge(getMetricsName("deltastreamer", "sourceBytesToBeIngestedInSyncRound"), sourceBytesToBeIngested); + } + } + @Override public void shutdown() { if (metrics != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index 34db1acdd93..3227891df5a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -55,6 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -297,6 +299,8 @@ public abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarne sendMessagesToKafka(topic, 1000, 2); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); assertEquals(500, fetch1.getBatch().get().count()); + verify(metrics, times(2)).updateStreamerSourceParallelism(4); + verify(metrics, times(2)).updateStreamerSourceBytesToBeIngestedInSyncRound(Long.MAX_VALUE); } static class TestSourceProfile implements SourceProfile<Long> { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index dda205db8f8..41ab16d7bfd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -88,7 +88,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -110,6 +109,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @Mock CloudObjectsSelectorCommon cloudObjectsSelectorCommon; @Mock + HoodieIngestionMetrics metrics; + @Mock SourceProfileSupplier sourceProfileSupplier; protected Option<SchemaProvider> schemaProvider; @@ -294,18 +295,22 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); // Verify the partitions being passed in getCloudObjectDataDF are correct. ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor<Integer> argumentCaptorForMetrics = ArgumentCaptor.forClass(Integer.class); verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); + verify(metrics, atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture()); + List<Integer> numPartitions; if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) { - Assertions.assertEquals(Arrays.asList(12, 3, 1), argumentCaptor.getAllValues()); + numPartitions = Arrays.asList(12, 3, 1); } else { - Assertions.assertEquals(Arrays.asList(23, 1), argumentCaptor.getAllValues()); + numPartitions = Arrays.asList(23, 1); } + Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); + Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); } @Test public void testCreateSource() throws IOException { TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); Source gcsSource = UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), typedProperties, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); assertEquals(Source.SourceType.ROW, gcsSource.getSourceType()); @@ -340,7 +345,7 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn TypedProperties typedProperties) { GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), - spark(), new CloudDataFetcher(typedProperties, jsc(), spark(), cloudObjectsSelectorCommon), queryRunner, + spark(), new CloudDataFetcher(typedProperties, jsc(), spark(), metrics, cloudObjectsSelectorCommon), queryRunner, new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index be26dfb1f3b..2a011cd9812 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -75,7 +75,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,7 +85,6 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -109,6 +107,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne SourceProfileSupplier sourceProfileSupplier; @Mock QueryInfo queryInfo; + @Mock + HoodieIngestionMetrics metrics; private JavaSparkContext jsc; private HoodieTableMetaClient metaClient; @@ -499,19 +499,22 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); // Verify the partitions being passed in getCloudObjectDataDF are correct. ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor<Integer> argumentCaptorForMetrics = ArgumentCaptor.forClass(Integer.class); verify(mockCloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), argumentCaptor.capture()); - List<Integer> numPartitions = Collections.emptyList(); + verify(metrics, atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture()); + List<Integer> numPartitions; if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) { - Assertions.assertEquals(Arrays.asList(12, 3, 1), argumentCaptor.getAllValues()); + numPartitions = Arrays.asList(12, 3, 1); } else { - Assertions.assertEquals(Arrays.asList(23, 1), argumentCaptor.getAllValues()); + numPartitions = Arrays.asList(23, 1); } + Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); + Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); } @Test public void testCreateSource() throws IOException { TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); - HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); Source s3Source = UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(), typedProperties, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); assertEquals(Source.SourceType.ROW, s3Source.getSourceType()); @@ -521,7 +524,7 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) { S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), - spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), spark(), mockCloudObjectsSelectorCommon), + spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), spark(), metrics, mockCloudObjectsSelectorCommon), new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
