This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/v3.0 by this push: new a81cbeb6 [FLINK-33017] Remove dependency on shaded guava a81cbeb6 is described below commit a81cbeb62b1f12a3f80ff6f2380047a2d7400194 Author: Alex Sorokoumov <aleksandr.sorokou...@gmail.com> AuthorDate: Tue Sep 5 16:28:14 2023 -0700 [FLINK-33017] Remove dependency on shaded guava The bump in shaded guava in Flink 1.18 changed import paths and caused the class loader fail when loading ManagedMemoryUtils. Looking at the root cause of the issue, shading was used as a technique to avoid dependency hell. As flink-connector-kafka should work with both flink 1.17 and 1.18 that use different guava versions (and hence library import paths), shading did not really solve the problem it was introduced for in the first place. There are several several options to work around the problem. First, we could introduce our own shading for guava. Second, we could see if the dependency on guava is necessary at all and maybe remove it completely. This patch takes the latter route and removes dependency on guava from this connector. --- .../flink/tests/util/kafka/SmokeKafkaITCase.java | 5 +-- .../flink/connector/kafka/sink/KafkaWriter.java | 31 +++++++-------- .../connectors/kafka/FlinkKafkaProducer.java | 13 +++--- .../internals/FlinkKafkaInternalProducer.java | 8 ++-- .../sink/FlinkKafkaInternalProducerITCase.java | 5 +-- .../KafkaRecordSerializationSchemaBuilderTest.java | 14 +++---- .../connector/kafka/sink/KafkaSinkITCase.java | 9 +++-- .../connector/kafka/sink/KafkaWriterITCase.java | 7 ++-- .../kafka/sink/TransactionToAbortCheckerTest.java | 46 ++++++++++++++-------- .../KafkaRecordDeserializationSchemaTest.java | 6 +-- .../kafka/FlinkKafkaInternalProducerITCase.java | 7 ++-- .../connectors/kafka/KafkaConsumerTestBase.java | 13 ++++-- .../kafka/shuffle/KafkaShuffleITCase.java | 26 ++++++------ .../kafka/table/KafkaDynamicTableFactoryTest.java | 4 +- pom.xml | 6 --- 15 files changed, 103 insertions(+), 97 deletions(-) diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java index 726eceea..a4d0c002 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java @@ -29,8 +29,6 @@ import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.test.util.JobSubmission; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; @@ -56,6 +54,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -137,7 +136,7 @@ class SmokeKafkaITCase { // create the required topics final short replicationFactor = 1; admin.createTopics( - Lists.newArrayList( + Arrays.asList( new NewTopic(inputTopic, 1, replicationFactor), new NewTopic(outputTopic, 1, replicationFactor))) .all() diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 48c52388..a48731c1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -33,10 +33,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; -import org.apache.flink.shaded.guava30.com.google.common.io.Closer; - import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -51,6 +47,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Deque; @@ -106,9 +103,10 @@ class KafkaWriter<IN> // producer pool only used for exactly once private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = new ArrayDeque<>(); - private final Closer closer = Closer.create(); private long lastCheckpointId; + private final Deque<AutoCloseable> producerCloseables = new ArrayDeque<>(); + private boolean closed = false; private long lastSync = System.currentTimeMillis(); @@ -180,7 +178,7 @@ class KafkaWriter<IN> } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE || deliveryGuarantee == DeliveryGuarantee.NONE) { this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null); - closer.register(this.currentProducer); + producerCloseables.add(this.currentProducer); initKafkaMetrics(this.currentProducer); } else { throw new UnsupportedOperationException( @@ -239,21 +237,18 @@ class KafkaWriter<IN> currentProducer = getTransactionalProducer(checkpointId + 1); currentProducer.beginTransaction(); } - return ImmutableList.of(kafkaWriterState); + return Collections.singletonList(kafkaWriterState); } @Override public void close() throws Exception { closed = true; LOG.debug("Closing writer with {}", currentProducer); - closeAll( - this::abortCurrentProducer, - closer, - producerPool::clear, - () -> { - checkState(currentProducer.isClosed()); - currentProducer = null; - }); + closeAll(this::abortCurrentProducer, producerPool::clear); + closeAll(producerCloseables); + checkState( + currentProducer.isClosed(), "Could not close current producer " + currentProducer); + currentProducer = null; // Rethrow exception for the case in which close is called before writer() and flush(). checkAsyncException(); @@ -282,7 +277,8 @@ class KafkaWriter<IN> void abortLingeringTransactions( Collection<KafkaWriterState> recoveredStates, long startCheckpointId) { - List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); + List<String> prefixesToAbort = new ArrayList<>(); + prefixesToAbort.add(transactionalIdPrefix); final Optional<KafkaWriterState> lastStateOpt = recoveredStates.stream().findFirst(); if (lastStateOpt.isPresent()) { @@ -340,7 +336,7 @@ class KafkaWriter<IN> FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll(); if (producer == null) { producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId); - closer.register(producer); + producerCloseables.add(producer); producer.initTransactions(); initKafkaMetrics(producer); } else { @@ -455,6 +451,7 @@ class KafkaWriter<IN> asyncProducerException = decorateException(metadata, exception, producer); } + // Checking for exceptions from previous writes mailboxExecutor.submit( () -> { // Checking for exceptions from previous writes diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index ca81cc8e..d6cbe2e7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -52,8 +52,6 @@ import org.apache.flink.util.NetUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TemporaryClassLoaderContext; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; @@ -1200,8 +1198,10 @@ public class FlinkKafkaProducer<IN> if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) { nextTransactionalIdHint = null; } else { - ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints = - Lists.newArrayList(nextTransactionalIdHintState.get()); + List<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints = + new ArrayList<>(); + nextTransactionalIdHintState.get().forEach(transactionalIdHints::add); + if (transactionalIdHints.size() > 1) { throw new IllegalStateException( "There should be at most one next transactional id hint written by the first subtask"); @@ -1444,8 +1444,9 @@ public class FlinkKafkaProducer<IN> context.getOperatorStateStore() .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2); - ArrayList<NextTransactionalIdHint> oldTransactionalIdHints = - Lists.newArrayList(oldNextTransactionalIdHintState.get()); + List<NextTransactionalIdHint> oldTransactionalIdHints = new ArrayList<>(); + oldNextTransactionalIdHintState.get().forEach(oldTransactionalIdHints::add); + if (!oldTransactionalIdHints.isEmpty()) { nextTransactionalIdHintState.addAll(oldTransactionalIdHints); // clear old state diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java index a424a816..a7d6e15b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java @@ -22,8 +22,6 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.guava30.com.google.common.base.Joiner; - import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; @@ -51,10 +49,12 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.Duration; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import java.util.stream.Collectors; /** Internal flink kafka producer. */ @PublicEvolving @@ -169,7 +169,9 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> { LOG.debug( "Closed internal KafkaProducer {}. Stacktrace: {}", System.identityHashCode(this), - Joiner.on("\n").join(Thread.currentThread().getStackTrace())); + Arrays.stream(Thread.currentThread().getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n"))); } closed = true; } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java index dd15ec22..69e9f19f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java @@ -19,8 +19,6 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -43,6 +41,7 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.time.Duration; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.function.Consumer; @@ -173,7 +172,7 @@ class FlinkKafkaInternalProducerITCase { } private static List<Consumer<FlinkKafkaInternalProducer<?, ?>>> provideTransactionsFinalizer() { - return Lists.newArrayList( + return Arrays.asList( FlinkKafkaInternalProducer::commitTransaction, FlinkKafkaInternalProducer::abortTransaction); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 614624ea..d40bad41 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -23,9 +23,6 @@ import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; - import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.serialization.Deserializer; @@ -34,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -162,7 +160,7 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { @Test public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception { - final Map<String, String> config = ImmutableMap.of("simpleKey", "simpleValue"); + final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue"); final KafkaRecordSerializationSchema<String> schema = KafkaRecordSerializationSchema.builder() .setTopic(DEFAULT_TOPIC) @@ -179,7 +177,7 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { @Test public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exception { - final Map<String, String> config = ImmutableMap.of("simpleKey", "simpleValue"); + final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue"); final KafkaRecordSerializationSchema<String> schema = KafkaRecordSerializationSchema.builder() .setTopic(DEFAULT_TOPIC) @@ -193,7 +191,7 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { @Test public void testSerializeRecordWithKafkaSerializer() throws Exception { - final Map<String, String> config = ImmutableMap.of("configKey", "configValue"); + final Map<String, String> config = Collections.singletonMap("configKey", "configValue"); final KafkaRecordSerializationSchema<String> schema = KafkaRecordSerializationSchema.builder() .setTopic(DEFAULT_TOPIC) @@ -261,7 +259,7 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { KafkaRecordSerializationSchemaBuilder<String>, KafkaRecordSerializationSchemaBuilder<String>>> valueSerializationSetter() { - return ImmutableList.of( + return Arrays.asList( (b) -> b.setKafkaValueSerializer(StringSerializer.class), (b) -> b.setValueSerializationSchema(new SimpleStringSchema()), (b) -> @@ -274,7 +272,7 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { KafkaRecordSerializationSchemaBuilder<String>, KafkaRecordSerializationSchemaBuilder<String>>> keySerializationSetter() { - return ImmutableList.of( + return Arrays.asList( (b) -> b.setKafkaKeySerializer(StringSerializer.class), (b) -> b.setKeySerializationSchema(new SimpleStringSchema()), (b) -> diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index 942902f0..fda9d6fa 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -60,8 +60,6 @@ import org.apache.flink.testutils.junit.SharedReference; import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.guava30.com.google.common.base.Joiner; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; @@ -88,6 +86,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -624,7 +623,11 @@ public class KafkaSinkITCase extends TestLogger { } private String format(Map.Entry<Thread, StackTraceElement[]> leak) { - return leak.getKey().getName() + ":\n" + Joiner.on("\n").join(leak.getValue()); + String stackTrace = + Arrays.stream(leak.getValue()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + return leak.getKey().getName() + ":\n" + stackTrace; } private boolean findAliveKafkaThread(Map.Entry<Thread, StackTraceElement[]> threadStackTrace) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index c1b022dc..811ffa20 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -34,8 +34,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -60,6 +58,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -545,7 +544,7 @@ public class KafkaWriterITCase { new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer), new DummyRecordSerializer(), new DummySchemaContext(), - ImmutableList.of()); + Collections.emptyList()); } private KafkaWriter<Integer> createWriterWithConfiguration( @@ -557,7 +556,7 @@ public class KafkaWriterITCase { sinkInitContext, new DummyRecordSerializer(), new DummySchemaContext(), - ImmutableList.of()); + Collections.emptyList()); } private static Properties getKafkaClientConfiguration() { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java index 1b65c9d3..897a8591 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java @@ -19,10 +19,10 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; - import org.junit.Test; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,18 +32,30 @@ import static org.assertj.core.api.Assertions.assertThat; public class TransactionToAbortCheckerTest extends TestLogger { public static final String ABORT = "abort"; + public static final String KEEP = "keep"; @Test public void testMustAbortTransactionsWithSameSubtaskIdAndHigherCheckpointOffset() { + Map<Integer, Long> offsetMapping = new HashMap<>(2); + offsetMapping.put(0, 1L); + offsetMapping.put(2, 3L); final TransactionsToAbortChecker checker = - new TransactionsToAbortChecker(2, ImmutableMap.of(0, 1L, 2, 3L), 0); + new TransactionsToAbortChecker(2, offsetMapping, 0); // abort recovered subtasksId with equal or higher checkpoint offset - final Map<Integer, Map<Long, String>> openTransactions = - ImmutableMap.of( - 0, ImmutableMap.of(2L, ABORT, 1L, ABORT), - 2, ImmutableMap.of(3L, ABORT, 4L, ABORT), - 3, ImmutableMap.of(3L, "keep", 4L, "keep")); + final Map<Integer, Map<Long, String>> openTransactions = new HashMap<>(3); + final Map<Long, String> subtask0 = new HashMap<>(); + subtask0.put(1L, ABORT); + subtask0.put(2L, ABORT); + openTransactions.put(0, subtask0); + final Map<Long, String> subtask2 = new HashMap<>(); + subtask2.put(3L, ABORT); + subtask2.put(4L, ABORT); + openTransactions.put(2, subtask2); + final Map<Long, String> subtask3 = new HashMap<>(); + subtask3.put(3L, KEEP); + subtask3.put(4L, KEEP); + openTransactions.put(3, subtask3); final List<String> transactionsToAbort = checker.getTransactionsToAbort(openTransactions); assertThat(transactionsToAbort).hasSize(4); @@ -53,16 +65,18 @@ public class TransactionToAbortCheckerTest extends TestLogger { @Test public void testMustAbortTransactionsIfLowestCheckpointOffsetIsMinimumOffset() { final TransactionsToAbortChecker checker = - new TransactionsToAbortChecker(2, ImmutableMap.of(0, 1L), 0); + new TransactionsToAbortChecker(2, Collections.singletonMap(0, 1L), 0); // abort recovered subtasksId with equal or higher checkpoint offset - final Map<Integer, Map<Long, String>> openTransactions = - ImmutableMap.of( - 0, ImmutableMap.of(2L, ABORT, 1L, ABORT), - 2, ImmutableMap.of(1L, ABORT), - 3, ImmutableMap.of(1L, "keep"), - 4, ImmutableMap.of(1L, ABORT), - 5, ImmutableMap.of(1L, "keep")); + final Map<Integer, Map<Long, String>> openTransactions = new HashMap<>(5); + final Map<Long, String> subtask0 = new HashMap<>(); + subtask0.put(1L, ABORT); + subtask0.put(2L, ABORT); + openTransactions.put(0, subtask0); + openTransactions.put(2, Collections.singletonMap(1L, ABORT)); + openTransactions.put(3, Collections.singletonMap(1L, KEEP)); + openTransactions.put(4, Collections.singletonMap(1L, ABORT)); + openTransactions.put(5, Collections.singletonMap(1L, KEEP)); final List<String> transactionsToAbort = checker.getTransactionsToAbort(openTransactions); assertThat(transactionsToAbort).hasSize(4); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java index 8766719a..e764c860 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java @@ -25,7 +25,6 @@ import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserialization import org.apache.flink.util.Collector; import org.apache.flink.util.jackson.JacksonMapperFactory; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -38,6 +37,7 @@ import org.junit.Before; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -116,7 +116,7 @@ public class KafkaRecordDeserializationSchemaTest { @Test public void testKafkaValueDeserializerWrapperWithoutConfigurable() throws Exception { - final Map<String, String> config = ImmutableMap.of("simpleKey", "simpleValue"); + final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue"); KafkaRecordDeserializationSchema<String> schema = KafkaRecordDeserializationSchema.valueOnly(SimpleStringSerializer.class, config); schema.open(new TestingDeserializationContext()); @@ -127,7 +127,7 @@ public class KafkaRecordDeserializationSchemaTest { @Test public void testKafkaValueDeserializerWrapperWithConfigurable() throws Exception { - final Map<String, String> config = ImmutableMap.of("configKey", "configValue"); + final Map<String, String> config = Collections.singletonMap("configKey", "configValue"); KafkaRecordDeserializationSchema<String> schema = KafkaRecordDeserializationSchema.valueOnly( ConfigurableStringSerializer.class, config); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java index 2f393374..15729a8c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java @@ -20,8 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer; -import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -35,6 +33,7 @@ import org.junit.Test; import java.time.Duration; import java.util.Collections; +import java.util.Iterator; import java.util.Properties; import java.util.UUID; @@ -241,9 +240,11 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase { records = kafkaConsumer.poll(Duration.ofMillis(10000)); } - ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); + final Iterator<ConsumerRecord<String, String>> it = records.iterator(); + ConsumerRecord<String, String> record = it.next(); assertThat(record.key()).isEqualTo(expectedKey); assertThat(record.value()).isEqualTo(expectedValue); + assertThat(it.hasNext()).isFalse(); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index a8596ee7..88f6ac60 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -80,8 +80,6 @@ import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; - import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -104,6 +102,7 @@ import java.util.BitSet; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -258,7 +257,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } while (System.nanoTime() < deadline); // cancel the job & wait for the job to finish - client.cancel(Iterables.getOnlyElement(getRunningJobs(client))).get(); + final Iterator<JobID> it = getRunningJobs(client).iterator(); + final JobID jobId = it.next(); + client.cancel(jobId).get(); + assertThat(it.hasNext()).isFalse(); runner.join(); final Throwable t = errorRef.get(); @@ -349,7 +351,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } while (System.nanoTime() < deadline); // cancel the job & wait for the job to finish - client.cancel(Iterables.getOnlyElement(getRunningJobs(client))).get(); + final Iterator<JobID> it = getRunningJobs(client).iterator(); + final JobID jobId = it.next(); + client.cancel(jobId).get(); + assertThat(it.hasNext()).isFalse(); runner.join(); final Throwable t = errorRef.get(); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java index 9a0a14da..5505bdde 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java @@ -34,9 +34,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher import org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher.KafkaShuffleWatermark; import org.apache.flink.util.PropertiesUtil; -import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Rule; import org.junit.Test; @@ -374,17 +371,16 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase { // Records in a single partition are kept in order Collection<ConsumerRecord<byte[], byte[]>> records = - Iterables.getOnlyElement( - testKafkaShuffleProducer( - topic( - "test_serde-" + UUID.randomUUID(), - timeCharacteristic), - env, - 1, - 1, - numElementsPerProducer, - timeCharacteristic) - .values()); + testKafkaShuffleProducer( + topic("test_serde-" + UUID.randomUUID(), timeCharacteristic), + env, + 1, + 1, + numElementsPerProducer, + timeCharacteristic) + .values() + .iterator() + .next(); switch (timeCharacteristic) { case ProcessingTime: @@ -516,7 +512,7 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase { r -> { final int partition = r.partition(); if (!results.containsKey(partition)) { - results.put(partition, Lists.newArrayList()); + results.put(partition, new ArrayList<>()); } results.get(partition).add(r); }); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index a0b74a5b..0a4182bd 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -73,8 +73,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; @@ -688,7 +686,7 @@ public class KafkaDynamicTableFactoryTest { @Test public void testTableSinkSemanticTranslation() { - final List<String> semantics = ImmutableList.of("exactly-once", "at-least-once", "none"); + final List<String> semantics = Arrays.asList("exactly-once", "at-least-once", "none"); final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = new EncodingFormatMock(","); for (final String semantic : semantics) { diff --git a/pom.xml b/pom.xml index 2302852d..c7db3228 100644 --- a/pom.xml +++ b/pom.xml @@ -77,12 +77,6 @@ under the License. </properties> <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-guava</artifactId> - <version>30.1.1-jre-16.1</version> - </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-jackson</artifactId>