This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d9eb7b469bf [HUDI-7535] Add metrics for sourceParallelism and Refresh
profile in S3/GCS (#10918)
d9eb7b469bf is described below
commit d9eb7b469bf112a0185729ec1ca98f22632755d0
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue May 14 13:51:09 2024 +0530
[HUDI-7535] Add metrics for sourceParallelism and Refresh profile in S3/GCS
(#10918)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../ingestion/HoodieIngestionMetrics.java | 4 ++++
.../sources/GcsEventsHoodieIncrSource.java | 24 ++++++++++++++--------
.../apache/hudi/utilities/sources/KafkaSource.java | 5 ++++-
.../sources/S3EventsHoodieIncrSource.java | 11 ++++++----
.../sources/helpers/CloudDataFetcher.java | 12 ++++++++---
.../utilities/streamer/HoodieStreamerMetrics.java | 13 ++++++++++++
.../utilities/sources/BaseTestKafkaSource.java | 4 ++++
.../sources/TestGcsEventsHoodieIncrSource.java | 15 +++++++++-----
.../sources/TestS3EventsHoodieIncrSource.java | 17 ++++++++-------
9 files changed, 76 insertions(+), 29 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
index eb9b51aedb3..378ba45e3e9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
@@ -62,5 +62,9 @@ public abstract class HoodieIngestionMetrics implements
Serializable {
public abstract void updateStreamerSourceNewMessageCount(String
sourceMetricName, long sourceNewMessageCount);
+ public abstract void updateStreamerSourceParallelism(int sourceParallelism);
+
+ public abstract void updateStreamerSourceBytesToBeIngestedInSyncRound(long
sourceBytesToBeIngested);
+
public abstract void shutdown();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 5900ddade24..7ab8894b315 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
@@ -112,24 +113,29 @@ public class GcsEventsHoodieIncrSource extends
HoodieIncrSource {
private final QueryRunner queryRunner;
private final Option<SchemaProvider> schemaProvider;
private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
-
private static final Logger LOG =
LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);
- public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
- SchemaProvider schemaProvider) {
-
+ public GcsEventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext jsc,
+ SparkSession spark,
+ SchemaProvider schemaProvider,
+ HoodieIngestionMetrics metrics) {
this(props, jsc, spark,
- new CloudDataFetcher(props, jsc, spark),
+ new CloudDataFetcher(props, jsc, spark, metrics),
new QueryRunner(spark, props),
new DefaultStreamContext(schemaProvider, Option.empty())
);
}
- public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
- StreamContext streamContext) {
-
+ public GcsEventsHoodieIncrSource(
+ TypedProperties props,
+ JavaSparkContext jsc,
+ SparkSession spark,
+ HoodieIngestionMetrics metrics,
+ StreamContext streamContext) {
this(props, jsc, spark,
- new CloudDataFetcher(props, jsc, spark),
+ new CloudDataFetcher(props, jsc, spark, metrics),
new QueryRunner(spark, props),
streamContext
);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 99af1ab0086..6666ed76904 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -84,11 +84,14 @@ public abstract class KafkaSource<T> extends Source<T> {
SourceProfile<Long> kafkaSourceProfile =
sourceProfileSupplier.get().getSourceProfile();
offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getSourcePartitions(), metrics);
+
metrics.updateStreamerSourceParallelism(kafkaSourceProfile.getSourcePartitions());
+
metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(kafkaSourceProfile.getMaxSourceBytes());
LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {}
partitions from Kafka for topic {} with offsetRanges {}",
kafkaSourceProfile.getSourceSpecificContext(),
kafkaSourceProfile.getMaxSourceBytes(),
kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(),
offsetRanges);
} else {
- long minPartitions = getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+ int minPartitions = (int) getLongWithAltKeys(props,
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+ metrics.updateStreamerSourceParallelism(minPartitions);
offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr,
sourceLimit, metrics);
LOG.info("About to read sourceLimit {} in {} spark partitions from kafka
for topic {} with offset ranges {}",
sourceLimit, minPartitions, offsetGen.getTopicName(),
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 579bc5c2021..ab8c0a55bbd 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
@@ -72,21 +73,23 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider,
+ HoodieIngestionMetrics metrics) {
this(props, sparkContext, sparkSession, new QueryRunner(sparkSession,
props),
- new CloudDataFetcher(props, sparkContext, sparkSession), new
DefaultStreamContext(schemaProvider, Option.empty()));
+ new CloudDataFetcher(props, sparkContext, sparkSession, metrics), new
DefaultStreamContext(schemaProvider, Option.empty()));
}
public S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
+ HoodieIngestionMetrics metrics,
StreamContext streamContext) {
this(props, sparkContext, sparkSession, new QueryRunner(sparkSession,
props),
- new CloudDataFetcher(props, sparkContext, sparkSession),
streamContext);
+ new CloudDataFetcher(props, sparkContext, sparkSession, metrics),
streamContext);
}
- public S3EventsHoodieIncrSource(
+ S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 06fb89da9a4..7fd656adb7e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
@@ -60,14 +61,17 @@ public class CloudDataFetcher implements Serializable {
private static final long serialVersionUID = 1L;
- public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc,
SparkSession sparkSession) {
- this(props, jsc, sparkSession, new CloudObjectsSelectorCommon(props));
+ private final HoodieIngestionMetrics metrics;
+
+ public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc,
SparkSession sparkSession, HoodieIngestionMetrics metrics) {
+ this(props, jsc, sparkSession, metrics, new
CloudObjectsSelectorCommon(props));
}
- public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc,
SparkSession sparkSession, CloudObjectsSelectorCommon
cloudObjectsSelectorCommon) {
+ public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc,
SparkSession sparkSession, HoodieIngestionMetrics metrics,
CloudObjectsSelectorCommon cloudObjectsSelectorCommon) {
this.props = props;
this.sparkContext = jsc;
this.sparkSession = sparkSession;
+ this.metrics = metrics;
this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon;
}
@@ -131,7 +135,9 @@ public class CloudDataFetcher implements Serializable {
}
// inflate 10% for potential hoodie meta fields
double totalSizeWithHoodieMetaFields = totalSize * 1.1;
+ metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(totalSize);
int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields
/ bytesPerPartition), 1);
+ metrics.updateStreamerSourceParallelism(numPartitions);
return cloudObjectsSelectorCommon.loadAsDataset(sparkSession,
cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
index ab1f72185a3..c5c01bee231 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
@@ -158,6 +158,19 @@ public class HoodieStreamerMetrics extends
HoodieIngestionMetrics {
}
}
+ @Override
+ public void updateStreamerSourceParallelism(int sourceParallelism) {
+ if (writeConfig.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer",
"sourceParallelism"), sourceParallelism);
+ }
+ }
+
+ public void updateStreamerSourceBytesToBeIngestedInSyncRound(long
sourceBytesToBeIngested) {
+ if (writeConfig.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer",
"sourceBytesToBeIngestedInSyncRound"), sourceBytesToBeIngested);
+ }
+ }
+
@Override
public void shutdown() {
if (metrics != null) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 34db1acdd93..3227891df5a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -55,6 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -297,6 +299,8 @@ public abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarne
sendMessagesToKafka(topic, 1000, 2);
InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(500, fetch1.getBatch().get().count());
+ verify(metrics, times(2)).updateStreamerSourceParallelism(4);
+ verify(metrics,
times(2)).updateStreamerSourceBytesToBeIngestedInSyncRound(Long.MAX_VALUE);
}
static class TestSourceProfile implements SourceProfile<Long> {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index dda205db8f8..41ab16d7bfd 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -88,7 +88,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -110,6 +109,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
@Mock
CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
@Mock
+ HoodieIngestionMetrics metrics;
+ @Mock
SourceProfileSupplier sourceProfileSupplier;
protected Option<SchemaProvider> schemaProvider;
@@ -294,18 +295,22 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
// Verify the partitions being passed in getCloudObjectDataDF are correct.
ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
+ ArgumentCaptor<Integer> argumentCaptorForMetrics =
ArgumentCaptor.forClass(Integer.class);
verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(),
any(), any(), eq(schemaProvider), argumentCaptor.capture());
+ verify(metrics,
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
+ List<Integer> numPartitions;
if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
- Assertions.assertEquals(Arrays.asList(12, 3, 1),
argumentCaptor.getAllValues());
+ numPartitions = Arrays.asList(12, 3, 1);
} else {
- Assertions.assertEquals(Arrays.asList(23, 1),
argumentCaptor.getAllValues());
+ numPartitions = Arrays.asList(23, 1);
}
+ Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
+ Assertions.assertEquals(numPartitions,
argumentCaptorForMetrics.getAllValues());
}
@Test
public void testCreateSource() throws IOException {
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
- HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
Source gcsSource =
UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(),
typedProperties, jsc(), spark(), metrics,
new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
assertEquals(Source.SourceType.ROW, gcsSource.getSourceType());
@@ -340,7 +345,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
TypedProperties typedProperties) {
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
- spark(), new CloudDataFetcher(typedProperties, jsc(), spark(),
cloudObjectsSelectorCommon), queryRunner,
+ spark(), new CloudDataFetcher(typedProperties, jsc(), spark(),
metrics, cloudObjectsSelectorCommon), queryRunner,
new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index be26dfb1f3b..2a011cd9812 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -75,7 +75,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -86,7 +85,6 @@ import static
org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -109,6 +107,8 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
SourceProfileSupplier sourceProfileSupplier;
@Mock
QueryInfo queryInfo;
+ @Mock
+ HoodieIngestionMetrics metrics;
private JavaSparkContext jsc;
private HoodieTableMetaClient metaClient;
@@ -499,19 +499,22 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
// Verify the partitions being passed in getCloudObjectDataDF are correct.
ArgumentCaptor<Integer> argumentCaptor =
ArgumentCaptor.forClass(Integer.class);
+ ArgumentCaptor<Integer> argumentCaptorForMetrics =
ArgumentCaptor.forClass(Integer.class);
verify(mockCloudObjectsSelectorCommon,
atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.eq(schemaProvider), argumentCaptor.capture());
- List<Integer> numPartitions = Collections.emptyList();
+ verify(metrics,
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
+ List<Integer> numPartitions;
if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
- Assertions.assertEquals(Arrays.asList(12, 3, 1),
argumentCaptor.getAllValues());
+ numPartitions = Arrays.asList(12, 3, 1);
} else {
- Assertions.assertEquals(Arrays.asList(23, 1),
argumentCaptor.getAllValues());
+ numPartitions = Arrays.asList(23, 1);
}
+ Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
+ Assertions.assertEquals(numPartitions,
argumentCaptorForMetrics.getAllValues());
}
@Test
public void testCreateSource() throws IOException {
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
- HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
Source s3Source =
UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(),
typedProperties, jsc(), spark(), metrics,
new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
assertEquals(Source.SourceType.ROW, s3Source.getSourceType());
@@ -521,7 +524,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint,
TypedProperties typedProperties) {
S3EventsHoodieIncrSource incrSource = new
S3EventsHoodieIncrSource(typedProperties, jsc(),
- spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(),
spark(), mockCloudObjectsSelectorCommon),
+ spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(),
spark(), metrics, mockCloudObjectsSelectorCommon),
new DefaultStreamContext(schemaProvider.orElse(null),
Option.of(sourceProfileSupplier)));
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);