This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new 19673ca  [FLINK-31700][Connectors/MongoDB] Fix MongoDB nightly CI 
failure
19673ca is described below

commit 19673caaaa260b65e5550489d731da417e74e2f7
Author: Jiabao Sun <[email protected]>
AuthorDate: Mon Apr 3 20:38:14 2023 +0800

    [FLINK-31700][Connectors/MongoDB] Fix MongoDB nightly CI failure
---
 .../writer/context/DefaultMongoSinkContext.java    |  52 +---------
 .../sink/writer/context/MongoSinkContext.java      |   5 +-
 .../mongodb/sink/writer/MongoWriterITCase.java     | 109 ++-------------------
 .../src/test/resources/archunit.properties         |   3 +
 4 files changed, 22 insertions(+), 147 deletions(-)

diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
index 4d5b381..f2958e1 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
@@ -18,15 +18,8 @@
 package org.apache.flink.connector.mongodb.sink.writer.context;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import java.util.OptionalLong;
 
 /** Default {@link MongoSinkContext} implementation. */
 @Internal
@@ -40,6 +33,11 @@ public class DefaultMongoSinkContext implements 
MongoSinkContext {
         this.writeOptions = writeOptions;
     }
 
+    @Override
+    public Sink.InitContext getInitContext() {
+        return initContext;
+    }
+
     @Override
     public long processTime() {
         return 
initContext.getProcessingTimeService().getCurrentProcessingTime();
@@ -49,44 +47,4 @@ public class DefaultMongoSinkContext implements 
MongoSinkContext {
     public MongoWriteOptions getWriteOptions() {
         return writeOptions;
     }
-
-    @Override
-    public UserCodeClassLoader getUserCodeClassLoader() {
-        return initContext.getUserCodeClassLoader();
-    }
-
-    @Override
-    public MailboxExecutor getMailboxExecutor() {
-        return initContext.getMailboxExecutor();
-    }
-
-    @Override
-    public ProcessingTimeService getProcessingTimeService() {
-        return initContext.getProcessingTimeService();
-    }
-
-    @Override
-    public int getSubtaskId() {
-        return initContext.getSubtaskId();
-    }
-
-    @Override
-    public int getNumberOfParallelSubtasks() {
-        return initContext.getNumberOfParallelSubtasks();
-    }
-
-    @Override
-    public SinkWriterMetricGroup metricGroup() {
-        return initContext.metricGroup();
-    }
-
-    @Override
-    public OptionalLong getRestoredCheckpointId() {
-        return initContext.getRestoredCheckpointId();
-    }
-
-    @Override
-    public SerializationSchema.InitializationContext 
asSerializationSchemaInitializationContext() {
-        return initContext.asSerializationSchemaInitializationContext();
-    }
 }
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
index 44efbf5..55e89d8 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
@@ -24,7 +24,10 @@ import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializat
 
 /** This context provides information for {@link MongoSerializationSchema}. */
 @PublicEvolving
-public interface MongoSinkContext extends Sink.InitContext {
+public interface MongoSinkContext {
+
+    /** Returns the current sink's init context. */
+    Sink.InitContext getInitContext();
 
     /** Returns the current process time in flink. */
     long processTime();
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index eff61e2..c071721 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -17,10 +17,7 @@
 
 package org.apache.flink.connector.mongodb.sink.writer;
 
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
 import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
 import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
@@ -28,18 +25,8 @@ import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializat
 import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
-import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.util.UserCodeClassLoader;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
@@ -63,7 +50,6 @@ import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.util.Optional;
-import java.util.OptionalLong;
 
 import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
 import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
@@ -90,7 +76,7 @@ public class MongoWriterITCase {
             MongoTestUtil.createMongoDBContainer(LOG);
 
     private static MongoClient mongoClient;
-    private static MetricListener metricListener;
+    private static TestSinkInitContext sinkInitContext;
 
     @BeforeAll
     static void beforeAll() {
@@ -106,7 +92,7 @@ public class MongoWriterITCase {
 
     @BeforeEach
     void setUp() {
-        metricListener = new MetricListener();
+        sinkInitContext = new TestSinkInitContext();
     }
 
     @Test
@@ -192,8 +178,8 @@ public class MongoWriterITCase {
 
         try (final MongoWriter<Document> writer =
                 createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
-            final Optional<Counter> recordsSend =
-                    metricListener.getCounter(MetricNames.NUM_RECORDS_SEND);
+            final Counter recordsSend = 
sinkInitContext.getNumRecordsOutCounter();
+
             writer.write(buildMessage(1), null);
             // Update existing index
             writer.write(buildMessage(2, "u"), null);
@@ -202,8 +188,7 @@ public class MongoWriterITCase {
 
             writer.doBulkWrite();
 
-            assertThat(recordsSend.isPresent()).isTrue();
-            assertThat(recordsSend.get().getCount()).isEqualTo(3L);
+            assertThat(recordsSend.getCount()).isEqualTo(3L);
         }
     }
 
@@ -217,8 +202,7 @@ public class MongoWriterITCase {
 
         try (final MongoWriter<Document> writer =
                 createWriter(collection, batchSize, batchIntervalMs, 
flushOnCheckpoint)) {
-            final Optional<Gauge<Long>> currentSendTime =
-                    metricListener.getGauge("currentSendTime");
+            final Optional<Gauge<Long>> currentSendTime = 
sinkInitContext.getCurrentSendTimeGauge();
 
             assertThat(currentSendTime.isPresent()).isTrue();
             
assertThat(currentSendTime.get().getValue()).isEqualTo(Long.MAX_VALUE);
@@ -253,16 +237,13 @@ public class MongoWriterITCase {
                         .setMaxRetries(0)
                         .build();
 
-        Sink.InitContext initContext = new MockInitContext(metricListener);
-
         MongoSerializationSchema<Document> testSerializationSchema =
                 (element, context) -> {
-                    assertThat(context.getSubtaskId()).isEqualTo(0);
-                    
assertThat(context.getNumberOfParallelSubtasks()).isEqualTo(1);
+                    
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
                     
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
                     assertThat(context.processTime())
                             .isEqualTo(
-                                    initContext
+                                    sinkInitContext
                                             .getProcessingTimeService()
                                             .getCurrentProcessingTime());
                     return new InsertOneModel<>(element.toBsonDocument());
@@ -274,7 +255,6 @@ public class MongoWriterITCase {
                         batchSize,
                         batchIntervalMs,
                         flushOnCheckpoint,
-                        initContext,
                         testSerializationSchema)) {
             writer.write(buildMessage(1), null);
             writer.write(buildMessage(2), null);
@@ -294,7 +274,6 @@ public class MongoWriterITCase {
                 batchSize,
                 batchIntervalMs,
                 flushOnCheckpoint,
-                new MockInitContext(metricListener),
                 new UpsertSerializationSchema());
     }
 
@@ -303,7 +282,6 @@ public class MongoWriterITCase {
             int batchSize,
             long batchIntervalMs,
             boolean flushOnCheckpoint,
-            Sink.InitContext initContext,
             MongoSerializationSchema<Document> serializationSchema) {
 
         MongoConnectionOptions connectionOptions =
@@ -324,7 +302,7 @@ public class MongoWriterITCase {
                 connectionOptions,
                 writeOptions,
                 flushOnCheckpoint,
-                initContext,
+                sinkInitContext,
                 serializationSchema);
     }
 
@@ -365,71 +343,4 @@ public class MongoWriterITCase {
             }
         }
     }
-
-    private static class MockInitContext implements Sink.InitContext {
-
-        private final OperatorIOMetricGroup ioMetricGroup;
-        private final SinkWriterMetricGroup metricGroup;
-        private final ProcessingTimeService timeService;
-
-        private MockInitContext(MetricListener metricListener) {
-            this.ioMetricGroup =
-                    
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
-                            .getIOMetricGroup();
-            MetricGroup metricGroup = metricListener.getMetricGroup();
-            this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup, 
ioMetricGroup);
-            this.timeService = new TestProcessingTimeService();
-        }
-
-        @Override
-        public UserCodeClassLoader getUserCodeClassLoader() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public MailboxExecutor getMailboxExecutor() {
-            return new SyncMailboxExecutor();
-        }
-
-        @Override
-        public ProcessingTimeService getProcessingTimeService() {
-            return timeService;
-        }
-
-        @Override
-        public int getSubtaskId() {
-            return 0;
-        }
-
-        @Override
-        public int getNumberOfParallelSubtasks() {
-            return 1;
-        }
-
-        @Override
-        public SinkWriterMetricGroup metricGroup() {
-            return metricGroup;
-        }
-
-        @Override
-        public OptionalLong getRestoredCheckpointId() {
-            return OptionalLong.empty();
-        }
-
-        @Override
-        public SerializationSchema.InitializationContext
-                asSerializationSchemaInitializationContext() {
-            return new SerializationSchema.InitializationContext() {
-                @Override
-                public MetricGroup getMetricGroup() {
-                    return metricGroup;
-                }
-
-                @Override
-                public UserCodeClassLoader getUserCodeClassLoader() {
-                    return null;
-                }
-            };
-        }
-    }
 }
diff --git a/flink-connector-mongodb/src/test/resources/archunit.properties 
b/flink-connector-mongodb/src/test/resources/archunit.properties
index 9d163eb..741bad3 100644
--- a/flink-connector-mongodb/src/test/resources/archunit.properties
+++ b/flink-connector-mongodb/src/test/resources/archunit.properties
@@ -29,3 +29,6 @@ freeze.store.default.allowStoreUpdate=true
 #freeze.refreeze=true
 
 freeze.store.default.path=archunit-violations
+
+# TRUE by default since 0.23.0, restore the old behavior by setting the 
ArchUnit property archRule.failOnEmptyShould=false
+archRule.failOnEmptyShould=false

Reply via email to