This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d9eb7b469bf [HUDI-7535] Add metrics for sourceParallelism and Refresh 
profile in S3/GCS (#10918)
d9eb7b469bf is described below

commit d9eb7b469bf112a0185729ec1ca98f22632755d0
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue May 14 13:51:09 2024 +0530

    [HUDI-7535] Add metrics for sourceParallelism and Refresh profile in S3/GCS 
(#10918)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../ingestion/HoodieIngestionMetrics.java          |  4 ++++
 .../sources/GcsEventsHoodieIncrSource.java         | 24 ++++++++++++++--------
 .../apache/hudi/utilities/sources/KafkaSource.java |  5 ++++-
 .../sources/S3EventsHoodieIncrSource.java          | 11 ++++++----
 .../sources/helpers/CloudDataFetcher.java          | 12 ++++++++---
 .../utilities/streamer/HoodieStreamerMetrics.java  | 13 ++++++++++++
 .../utilities/sources/BaseTestKafkaSource.java     |  4 ++++
 .../sources/TestGcsEventsHoodieIncrSource.java     | 15 +++++++++-----
 .../sources/TestS3EventsHoodieIncrSource.java      | 17 ++++++++-------
 9 files changed, 76 insertions(+), 29 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
index eb9b51aedb3..378ba45e3e9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java
@@ -62,5 +62,9 @@ public abstract class HoodieIngestionMetrics implements 
Serializable {
 
   public abstract void updateStreamerSourceNewMessageCount(String 
sourceMetricName, long sourceNewMessageCount);
 
+  public abstract void updateStreamerSourceParallelism(int sourceParallelism);
+
+  public abstract void updateStreamerSourceBytesToBeIngestedInSyncRound(long 
sourceBytesToBeIngested);
+
   public abstract void shutdown();
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
index 5900ddade24..7ab8894b315 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
@@ -112,24 +113,29 @@ public class GcsEventsHoodieIncrSource extends 
HoodieIncrSource {
   private final QueryRunner queryRunner;
   private final Option<SchemaProvider> schemaProvider;
   private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
-
   private static final Logger LOG = 
LoggerFactory.getLogger(GcsEventsHoodieIncrSource.class);
 
-  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
-                                   SchemaProvider schemaProvider) {
-
+  public GcsEventsHoodieIncrSource(
+      TypedProperties props,
+      JavaSparkContext jsc,
+      SparkSession spark,
+      SchemaProvider schemaProvider,
+      HoodieIngestionMetrics metrics) {
     this(props, jsc, spark,
-        new CloudDataFetcher(props, jsc, spark),
+        new CloudDataFetcher(props, jsc, spark, metrics),
         new QueryRunner(spark, props),
         new DefaultStreamContext(schemaProvider, Option.empty())
     );
   }
 
-  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
-                                   StreamContext streamContext) {
-
+  public GcsEventsHoodieIncrSource(
+      TypedProperties props,
+      JavaSparkContext jsc,
+      SparkSession spark,
+      HoodieIngestionMetrics metrics,
+      StreamContext streamContext) {
     this(props, jsc, spark,
-        new CloudDataFetcher(props, jsc, spark),
+        new CloudDataFetcher(props, jsc, spark, metrics),
         new QueryRunner(spark, props),
         streamContext
     );
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 99af1ab0086..6666ed76904 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -84,11 +84,14 @@ public abstract class KafkaSource<T> extends Source<T> {
       SourceProfile<Long> kafkaSourceProfile = 
sourceProfileSupplier.get().getSourceProfile();
       offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
kafkaSourceProfile.getSourceSpecificContext(),
           kafkaSourceProfile.getSourcePartitions(), metrics);
+      
metrics.updateStreamerSourceParallelism(kafkaSourceProfile.getSourcePartitions());
+      
metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(kafkaSourceProfile.getMaxSourceBytes());
       LOG.info("About to read maxEventsInSyncRound {} of size {} bytes in {} 
partitions from Kafka for topic {} with offsetRanges {}",
           kafkaSourceProfile.getSourceSpecificContext(), 
kafkaSourceProfile.getMaxSourceBytes(),
           kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), 
offsetRanges);
     } else {
-      long minPartitions = getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+      int minPartitions = (int) getLongWithAltKeys(props, 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
+      metrics.updateStreamerSourceParallelism(minPartitions);
       offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, 
sourceLimit, metrics);
       LOG.info("About to read sourceLimit {} in {} spark partitions from kafka 
for topic {} with offset ranges {}",
           sourceLimit, minPartitions, offsetGen.getTopicName(),
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 579bc5c2021..ab8c0a55bbd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
@@ -72,21 +73,23 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
+      SchemaProvider schemaProvider,
+      HoodieIngestionMetrics metrics) {
     this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, 
props),
-        new CloudDataFetcher(props, sparkContext, sparkSession), new 
DefaultStreamContext(schemaProvider, Option.empty()));
+        new CloudDataFetcher(props, sparkContext, sparkSession, metrics), new 
DefaultStreamContext(schemaProvider, Option.empty()));
   }
 
   public S3EventsHoodieIncrSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
+      HoodieIngestionMetrics metrics,
       StreamContext streamContext) {
     this(props, sparkContext, sparkSession, new QueryRunner(sparkSession, 
props),
-        new CloudDataFetcher(props, sparkContext, sparkSession), 
streamContext);
+        new CloudDataFetcher(props, sparkContext, sparkSession, metrics), 
streamContext);
   }
 
-  public S3EventsHoodieIncrSource(
+  S3EventsHoodieIncrSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
index 06fb89da9a4..7fd656adb7e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 
@@ -60,14 +61,17 @@ public class CloudDataFetcher implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, 
SparkSession sparkSession) {
-    this(props, jsc, sparkSession, new CloudObjectsSelectorCommon(props));
+  private final HoodieIngestionMetrics metrics;
+
+  public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, 
SparkSession sparkSession, HoodieIngestionMetrics metrics) {
+    this(props, jsc, sparkSession, metrics, new 
CloudObjectsSelectorCommon(props));
   }
 
-  public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, 
SparkSession sparkSession, CloudObjectsSelectorCommon 
cloudObjectsSelectorCommon) {
+  public CloudDataFetcher(TypedProperties props, JavaSparkContext jsc, 
SparkSession sparkSession, HoodieIngestionMetrics metrics, 
CloudObjectsSelectorCommon cloudObjectsSelectorCommon) {
     this.props = props;
     this.sparkContext = jsc;
     this.sparkSession = sparkSession;
+    this.metrics = metrics;
     this.cloudObjectsSelectorCommon = cloudObjectsSelectorCommon;
   }
 
@@ -131,7 +135,9 @@ public class CloudDataFetcher implements Serializable {
     }
     // inflate 10% for potential hoodie meta fields
     double totalSizeWithHoodieMetaFields = totalSize * 1.1;
+    metrics.updateStreamerSourceBytesToBeIngestedInSyncRound(totalSize);
     int numPartitions = (int) Math.max(Math.ceil(totalSizeWithHoodieMetaFields 
/ bytesPerPartition), 1);
+    metrics.updateStreamerSourceParallelism(numPartitions);
     return cloudObjectsSelectorCommon.loadAsDataset(sparkSession, 
cloudObjectMetadata, getFileFormat(props), schemaProviderOption, numPartitions);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
index ab1f72185a3..c5c01bee231 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java
@@ -158,6 +158,19 @@ public class HoodieStreamerMetrics extends 
HoodieIngestionMetrics {
     }
   }
 
+  @Override
+  public void updateStreamerSourceParallelism(int sourceParallelism) {
+    if (writeConfig.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", 
"sourceParallelism"), sourceParallelism);
+    }
+  }
+
+  public void updateStreamerSourceBytesToBeIngestedInSyncRound(long 
sourceBytesToBeIngested) {
+    if (writeConfig.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", 
"sourceBytesToBeIngestedInSyncRound"), sourceBytesToBeIngested);
+    }
+  }
+
   @Override
   public void shutdown() {
     if (metrics != null) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 34db1acdd93..3227891df5a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -55,6 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -297,6 +299,8 @@ public abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarne
     sendMessagesToKafka(topic, 1000, 2);
     InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
     assertEquals(500, fetch1.getBatch().get().count());
+    verify(metrics, times(2)).updateStreamerSourceParallelism(4);
+    verify(metrics, 
times(2)).updateStreamerSourceBytesToBeIngestedInSyncRound(Long.MAX_VALUE);
   }
 
   static class TestSourceProfile implements SourceProfile<Long> {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index dda205db8f8..41ab16d7bfd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -88,7 +88,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -110,6 +109,8 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
   @Mock
   CloudObjectsSelectorCommon cloudObjectsSelectorCommon;
   @Mock
+  HoodieIngestionMetrics metrics;
+  @Mock
   SourceProfileSupplier sourceProfileSupplier;
 
   protected Option<SchemaProvider> schemaProvider;
@@ -294,18 +295,22 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
     // Verify the partitions being passed in getCloudObjectDataDF are correct.
     ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> argumentCaptorForMetrics = 
ArgumentCaptor.forClass(Integer.class);
     verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), 
any(), any(), eq(schemaProvider), argumentCaptor.capture());
+    verify(metrics, 
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
+    List<Integer> numPartitions;
     if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
-      Assertions.assertEquals(Arrays.asList(12, 3, 1), 
argumentCaptor.getAllValues());
+      numPartitions = Arrays.asList(12, 3, 1);
     } else {
-      Assertions.assertEquals(Arrays.asList(23, 1), 
argumentCaptor.getAllValues());
+      numPartitions = Arrays.asList(23, 1);
     }
+    Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
+    Assertions.assertEquals(numPartitions, 
argumentCaptorForMetrics.getAllValues());
   }
 
   @Test
   public void testCreateSource() throws IOException {
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
     Source gcsSource = 
UtilHelpers.createSource(GcsEventsHoodieIncrSource.class.getName(), 
typedProperties, jsc(), spark(), metrics,
         new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
     assertEquals(Source.SourceType.ROW, gcsSource.getSourceType());
@@ -340,7 +345,7 @@ public class TestGcsEventsHoodieIncrSource extends 
SparkClientFunctionalTestHarn
                              TypedProperties typedProperties) {
 
     GcsEventsHoodieIncrSource incrSource = new 
GcsEventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), new CloudDataFetcher(typedProperties, jsc(), spark(), 
cloudObjectsSelectorCommon), queryRunner,
+        spark(), new CloudDataFetcher(typedProperties, jsc(), spark(), 
metrics, cloudObjectsSelectorCommon), queryRunner,
         new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index be26dfb1f3b..2a011cd9812 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -75,7 +75,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,7 +85,6 @@ import static 
org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -109,6 +107,8 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
   SourceProfileSupplier sourceProfileSupplier;
   @Mock
   QueryInfo queryInfo;
+  @Mock
+  HoodieIngestionMetrics metrics;
   private JavaSparkContext jsc;
   private HoodieTableMetaClient metaClient;
 
@@ -499,19 +499,22 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
     readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, 
typedProperties);
     // Verify the partitions being passed in getCloudObjectDataDF are correct.
     ArgumentCaptor<Integer> argumentCaptor = 
ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> argumentCaptorForMetrics = 
ArgumentCaptor.forClass(Integer.class);
     verify(mockCloudObjectsSelectorCommon, 
atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.eq(schemaProvider), argumentCaptor.capture());
-    List<Integer> numPartitions = Collections.emptyList();
+    verify(metrics, 
atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture());
+    List<Integer> numPartitions;
     if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) {
-      Assertions.assertEquals(Arrays.asList(12, 3, 1), 
argumentCaptor.getAllValues());
+      numPartitions = Arrays.asList(12, 3, 1);
     } else {
-      Assertions.assertEquals(Arrays.asList(23, 1), 
argumentCaptor.getAllValues());
+      numPartitions = Arrays.asList(23, 1);
     }
+    Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues());
+    Assertions.assertEquals(numPartitions, 
argumentCaptorForMetrics.getAllValues());
   }
 
   @Test
   public void testCreateSource() throws IOException {
     TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-    HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
     Source s3Source = 
UtilHelpers.createSource(S3EventsHoodieIncrSource.class.getName(), 
typedProperties, jsc(), spark(), metrics,
         new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
     assertEquals(Source.SourceType.ROW, s3Source.getSourceType());
@@ -521,7 +524,7 @@ public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarne
                              Option<String> checkpointToPull, long 
sourceLimit, String expectedCheckpoint,
                              TypedProperties typedProperties) {
     S3EventsHoodieIncrSource incrSource = new 
S3EventsHoodieIncrSource(typedProperties, jsc(),
-        spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), 
spark(), mockCloudObjectsSelectorCommon),
+        spark(), mockQueryRunner, new CloudDataFetcher(typedProperties, jsc(), 
spark(), metrics, mockCloudObjectsSelectorCommon),
         new DefaultStreamContext(schemaProvider.orElse(null), 
Option.of(sourceProfileSupplier)));
 
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);

Reply via email to