This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push:
new 19673ca [FLINK-31700][Connectors/MongoDB] Fix MongoDB nightly CI
failure
19673ca is described below
commit 19673caaaa260b65e5550489d731da417e74e2f7
Author: Jiabao Sun <[email protected]>
AuthorDate: Mon Apr 3 20:38:14 2023 +0800
[FLINK-31700][Connectors/MongoDB] Fix MongoDB nightly CI failure
---
.../writer/context/DefaultMongoSinkContext.java | 52 +---------
.../sink/writer/context/MongoSinkContext.java | 5 +-
.../mongodb/sink/writer/MongoWriterITCase.java | 109 ++-------------------
.../src/test/resources/archunit.properties | 3 +
4 files changed, 22 insertions(+), 147 deletions(-)
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
index 4d5b381..f2958e1 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
@@ -18,15 +18,8 @@
package org.apache.flink.connector.mongodb.sink.writer.context;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import java.util.OptionalLong;
/** Default {@link MongoSinkContext} implementation. */
@Internal
@@ -40,6 +33,11 @@ public class DefaultMongoSinkContext implements
MongoSinkContext {
this.writeOptions = writeOptions;
}
+ @Override
+ public Sink.InitContext getInitContext() {
+ return initContext;
+ }
+
@Override
public long processTime() {
return
initContext.getProcessingTimeService().getCurrentProcessingTime();
@@ -49,44 +47,4 @@ public class DefaultMongoSinkContext implements
MongoSinkContext {
public MongoWriteOptions getWriteOptions() {
return writeOptions;
}
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return initContext.getUserCodeClassLoader();
- }
-
- @Override
- public MailboxExecutor getMailboxExecutor() {
- return initContext.getMailboxExecutor();
- }
-
- @Override
- public ProcessingTimeService getProcessingTimeService() {
- return initContext.getProcessingTimeService();
- }
-
- @Override
- public int getSubtaskId() {
- return initContext.getSubtaskId();
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return initContext.getNumberOfParallelSubtasks();
- }
-
- @Override
- public SinkWriterMetricGroup metricGroup() {
- return initContext.metricGroup();
- }
-
- @Override
- public OptionalLong getRestoredCheckpointId() {
- return initContext.getRestoredCheckpointId();
- }
-
- @Override
- public SerializationSchema.InitializationContext
asSerializationSchemaInitializationContext() {
- return initContext.asSerializationSchemaInitializationContext();
- }
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
index 44efbf5..55e89d8 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
@@ -24,7 +24,10 @@ import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializat
/** This context provides information for {@link MongoSerializationSchema}. */
@PublicEvolving
-public interface MongoSinkContext extends Sink.InitContext {
+public interface MongoSinkContext {
+
+ /** Returns the current sink's init context. */
+ Sink.InitContext getInitContext();
/** Returns the current process time in flink. */
long processTime();
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index eff61e2..c071721 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -17,10 +17,7 @@
package org.apache.flink.connector.mongodb.sink.writer;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
@@ -28,18 +25,8 @@ import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializat
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
-import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.util.UserCodeClassLoader;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -63,7 +50,6 @@ import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.Optional;
-import java.util.OptionalLong;
import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
@@ -90,7 +76,7 @@ public class MongoWriterITCase {
MongoTestUtil.createMongoDBContainer(LOG);
private static MongoClient mongoClient;
- private static MetricListener metricListener;
+ private static TestSinkInitContext sinkInitContext;
@BeforeAll
static void beforeAll() {
@@ -106,7 +92,7 @@ public class MongoWriterITCase {
@BeforeEach
void setUp() {
- metricListener = new MetricListener();
+ sinkInitContext = new TestSinkInitContext();
}
@Test
@@ -192,8 +178,8 @@ public class MongoWriterITCase {
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs,
flushOnCheckpoint)) {
- final Optional<Counter> recordsSend =
- metricListener.getCounter(MetricNames.NUM_RECORDS_SEND);
+ final Counter recordsSend =
sinkInitContext.getNumRecordsOutCounter();
+
writer.write(buildMessage(1), null);
// Update existing index
writer.write(buildMessage(2, "u"), null);
@@ -202,8 +188,7 @@ public class MongoWriterITCase {
writer.doBulkWrite();
- assertThat(recordsSend.isPresent()).isTrue();
- assertThat(recordsSend.get().getCount()).isEqualTo(3L);
+ assertThat(recordsSend.getCount()).isEqualTo(3L);
}
}
@@ -217,8 +202,7 @@ public class MongoWriterITCase {
try (final MongoWriter<Document> writer =
createWriter(collection, batchSize, batchIntervalMs,
flushOnCheckpoint)) {
- final Optional<Gauge<Long>> currentSendTime =
- metricListener.getGauge("currentSendTime");
+ final Optional<Gauge<Long>> currentSendTime =
sinkInitContext.getCurrentSendTimeGauge();
assertThat(currentSendTime.isPresent()).isTrue();
assertThat(currentSendTime.get().getValue()).isEqualTo(Long.MAX_VALUE);
@@ -253,16 +237,13 @@ public class MongoWriterITCase {
.setMaxRetries(0)
.build();
- Sink.InitContext initContext = new MockInitContext(metricListener);
-
MongoSerializationSchema<Document> testSerializationSchema =
(element, context) -> {
- assertThat(context.getSubtaskId()).isEqualTo(0);
-
assertThat(context.getNumberOfParallelSubtasks()).isEqualTo(1);
+
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
assertThat(context.processTime())
.isEqualTo(
- initContext
+ sinkInitContext
.getProcessingTimeService()
.getCurrentProcessingTime());
return new InsertOneModel<>(element.toBsonDocument());
@@ -274,7 +255,6 @@ public class MongoWriterITCase {
batchSize,
batchIntervalMs,
flushOnCheckpoint,
- initContext,
testSerializationSchema)) {
writer.write(buildMessage(1), null);
writer.write(buildMessage(2), null);
@@ -294,7 +274,6 @@ public class MongoWriterITCase {
batchSize,
batchIntervalMs,
flushOnCheckpoint,
- new MockInitContext(metricListener),
new UpsertSerializationSchema());
}
@@ -303,7 +282,6 @@ public class MongoWriterITCase {
int batchSize,
long batchIntervalMs,
boolean flushOnCheckpoint,
- Sink.InitContext initContext,
MongoSerializationSchema<Document> serializationSchema) {
MongoConnectionOptions connectionOptions =
@@ -324,7 +302,7 @@ public class MongoWriterITCase {
connectionOptions,
writeOptions,
flushOnCheckpoint,
- initContext,
+ sinkInitContext,
serializationSchema);
}
@@ -365,71 +343,4 @@ public class MongoWriterITCase {
}
}
}
-
- private static class MockInitContext implements Sink.InitContext {
-
- private final OperatorIOMetricGroup ioMetricGroup;
- private final SinkWriterMetricGroup metricGroup;
- private final ProcessingTimeService timeService;
-
- private MockInitContext(MetricListener metricListener) {
- this.ioMetricGroup =
-
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
- .getIOMetricGroup();
- MetricGroup metricGroup = metricListener.getMetricGroup();
- this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup,
ioMetricGroup);
- this.timeService = new TestProcessingTimeService();
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public MailboxExecutor getMailboxExecutor() {
- return new SyncMailboxExecutor();
- }
-
- @Override
- public ProcessingTimeService getProcessingTimeService() {
- return timeService;
- }
-
- @Override
- public int getSubtaskId() {
- return 0;
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return 1;
- }
-
- @Override
- public SinkWriterMetricGroup metricGroup() {
- return metricGroup;
- }
-
- @Override
- public OptionalLong getRestoredCheckpointId() {
- return OptionalLong.empty();
- }
-
- @Override
- public SerializationSchema.InitializationContext
- asSerializationSchemaInitializationContext() {
- return new SerializationSchema.InitializationContext() {
- @Override
- public MetricGroup getMetricGroup() {
- return metricGroup;
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return null;
- }
- };
- }
- }
}
diff --git a/flink-connector-mongodb/src/test/resources/archunit.properties
b/flink-connector-mongodb/src/test/resources/archunit.properties
index 9d163eb..741bad3 100644
--- a/flink-connector-mongodb/src/test/resources/archunit.properties
+++ b/flink-connector-mongodb/src/test/resources/archunit.properties
@@ -29,3 +29,6 @@ freeze.store.default.allowStoreUpdate=true
#freeze.refreeze=true
freeze.store.default.path=archunit-violations
+
+# TRUE by default since 0.23.0, restore the old behavior by setting the
ArchUnit property archRule.failOnEmptyShould=false
+archRule.failOnEmptyShould=false