This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new a81cbeb6 [FLINK-33017] Remove dependency on shaded guava
a81cbeb6 is described below

commit a81cbeb62b1f12a3f80ff6f2380047a2d7400194
Author: Alex Sorokoumov <aleksandr.sorokou...@gmail.com>
AuthorDate: Tue Sep 5 16:28:14 2023 -0700

    [FLINK-33017] Remove dependency on shaded guava
    
    The bump in shaded guava in Flink 1.18 changed import paths and caused
    the class loader fail when loading ManagedMemoryUtils.
    
    Looking at the root cause of the issue, shading was used as a technique
    to avoid dependency hell. As flink-connector-kafka should work with both
    flink 1.17 and 1.18 that use different guava versions (and hence library
    import paths), shading did not really solve the problem it was introduced
    for in the first place.
    
    There are several several options to work around the problem. First,
    we could introduce our own shading for guava. Second, we could see if
    the dependency on guava is necessary at all and maybe remove it
    completely.
    
    This patch takes the latter route and removes dependency on guava from
    this connector.
---
 .../flink/tests/util/kafka/SmokeKafkaITCase.java   |  5 +--
 .../flink/connector/kafka/sink/KafkaWriter.java    | 31 +++++++--------
 .../connectors/kafka/FlinkKafkaProducer.java       | 13 +++---
 .../internals/FlinkKafkaInternalProducer.java      |  8 ++--
 .../sink/FlinkKafkaInternalProducerITCase.java     |  5 +--
 .../KafkaRecordSerializationSchemaBuilderTest.java | 14 +++----
 .../connector/kafka/sink/KafkaSinkITCase.java      |  9 +++--
 .../connector/kafka/sink/KafkaWriterITCase.java    |  7 ++--
 .../kafka/sink/TransactionToAbortCheckerTest.java  | 46 ++++++++++++++--------
 .../KafkaRecordDeserializationSchemaTest.java      |  6 +--
 .../kafka/FlinkKafkaInternalProducerITCase.java    |  7 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java    | 13 ++++--
 .../kafka/shuffle/KafkaShuffleITCase.java          | 26 ++++++------
 .../kafka/table/KafkaDynamicTableFactoryTest.java  |  4 +-
 pom.xml                                            |  6 ---
 15 files changed, 103 insertions(+), 97 deletions(-)

diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
index 726eceea..a4d0c002 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -29,8 +29,6 @@ import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.test.util.JobSubmission;
 import org.apache.flink.util.TestLoggerExtension;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -56,6 +54,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -137,7 +136,7 @@ class SmokeKafkaITCase {
         // create the required topics
         final short replicationFactor = 1;
         admin.createTopics(
-                        Lists.newArrayList(
+                        Arrays.asList(
                                 new NewTopic(inputTopic, 1, replicationFactor),
                                 new NewTopic(outputTopic, 1, 
replicationFactor)))
                 .all()
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 48c52388..a48731c1 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -33,10 +33,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper;
 import org.apache.flink.util.FlinkRuntimeException;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -51,6 +47,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -106,9 +103,10 @@ class KafkaWriter<IN>
     // producer pool only used for exactly once
     private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> 
producerPool =
             new ArrayDeque<>();
-    private final Closer closer = Closer.create();
     private long lastCheckpointId;
 
+    private final Deque<AutoCloseable> producerCloseables = new ArrayDeque<>();
+
     private boolean closed = false;
     private long lastSync = System.currentTimeMillis();
 
@@ -180,7 +178,7 @@ class KafkaWriter<IN>
         } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE
                 || deliveryGuarantee == DeliveryGuarantee.NONE) {
             this.currentProducer = new 
FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null);
-            closer.register(this.currentProducer);
+            producerCloseables.add(this.currentProducer);
             initKafkaMetrics(this.currentProducer);
         } else {
             throw new UnsupportedOperationException(
@@ -239,21 +237,18 @@ class KafkaWriter<IN>
             currentProducer = getTransactionalProducer(checkpointId + 1);
             currentProducer.beginTransaction();
         }
-        return ImmutableList.of(kafkaWriterState);
+        return Collections.singletonList(kafkaWriterState);
     }
 
     @Override
     public void close() throws Exception {
         closed = true;
         LOG.debug("Closing writer with {}", currentProducer);
-        closeAll(
-                this::abortCurrentProducer,
-                closer,
-                producerPool::clear,
-                () -> {
-                    checkState(currentProducer.isClosed());
-                    currentProducer = null;
-                });
+        closeAll(this::abortCurrentProducer, producerPool::clear);
+        closeAll(producerCloseables);
+        checkState(
+                currentProducer.isClosed(), "Could not close current producer 
" + currentProducer);
+        currentProducer = null;
 
         // Rethrow exception for the case in which close is called before 
writer() and flush().
         checkAsyncException();
@@ -282,7 +277,8 @@ class KafkaWriter<IN>
 
     void abortLingeringTransactions(
             Collection<KafkaWriterState> recoveredStates, long 
startCheckpointId) {
-        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+        List<String> prefixesToAbort = new ArrayList<>();
+        prefixesToAbort.add(transactionalIdPrefix);
 
         final Optional<KafkaWriterState> lastStateOpt = 
recoveredStates.stream().findFirst();
         if (lastStateOpt.isPresent()) {
@@ -340,7 +336,7 @@ class KafkaWriter<IN>
         FlinkKafkaInternalProducer<byte[], byte[]> producer = 
producerPool.poll();
         if (producer == null) {
             producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, 
transactionalId);
-            closer.register(producer);
+            producerCloseables.add(producer);
             producer.initTransactions();
             initKafkaMetrics(producer);
         } else {
@@ -455,6 +451,7 @@ class KafkaWriter<IN>
                     asyncProducerException = decorateException(metadata, 
exception, producer);
                 }
 
+                // Checking for exceptions from previous writes
                 mailboxExecutor.submit(
                         () -> {
                             // Checking for exceptions from previous writes
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index ca81cc8e..d6cbe2e7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -52,8 +52,6 @@ import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
@@ -1200,8 +1198,10 @@ public class FlinkKafkaProducer<IN>
         if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
             nextTransactionalIdHint = null;
         } else {
-            ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> 
transactionalIdHints =
-                    Lists.newArrayList(nextTransactionalIdHintState.get());
+            List<FlinkKafkaProducer.NextTransactionalIdHint> 
transactionalIdHints =
+                    new ArrayList<>();
+            
nextTransactionalIdHintState.get().forEach(transactionalIdHints::add);
+
             if (transactionalIdHints.size() > 1) {
                 throw new IllegalStateException(
                         "There should be at most one next transactional id 
hint written by the first subtask");
@@ -1444,8 +1444,9 @@ public class FlinkKafkaProducer<IN>
                 context.getOperatorStateStore()
                         
.getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
 
-        ArrayList<NextTransactionalIdHint> oldTransactionalIdHints =
-                Lists.newArrayList(oldNextTransactionalIdHintState.get());
+        List<NextTransactionalIdHint> oldTransactionalIdHints = new 
ArrayList<>();
+        
oldNextTransactionalIdHintState.get().forEach(oldTransactionalIdHints::add);
+
         if (!oldTransactionalIdHints.isEmpty()) {
             nextTransactionalIdHintState.addAll(oldTransactionalIdHints);
             // clear old state
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index a424a816..a7d6e15b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -22,8 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
-
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Callback;
@@ -51,10 +49,12 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /** Internal flink kafka producer. */
 @PublicEvolving
@@ -169,7 +169,9 @@ public class FlinkKafkaInternalProducer<K, V> implements 
Producer<K, V> {
                 LOG.debug(
                         "Closed internal KafkaProducer {}. Stacktrace: {}",
                         System.identityHashCode(this),
-                        
Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
+                        Arrays.stream(Thread.currentThread().getStackTrace())
+                                .map(StackTraceElement::toString)
+                                .collect(Collectors.joining("\n")));
             }
             closed = true;
         }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index dd15ec22..69e9f19f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -19,8 +19,6 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.util.TestLoggerExtension;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -43,6 +41,7 @@ import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.function.Consumer;
@@ -173,7 +172,7 @@ class FlinkKafkaInternalProducerITCase {
     }
 
     private static List<Consumer<FlinkKafkaInternalProducer<?, ?>>> 
provideTransactionsFinalizer() {
-        return Lists.newArrayList(
+        return Arrays.asList(
                 FlinkKafkaInternalProducer::commitTransaction,
                 FlinkKafkaInternalProducer::abortTransaction);
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
index 614624ea..d40bad41 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
@@ -23,9 +23,6 @@ import 
org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -34,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -162,7 +160,7 @@ public class KafkaRecordSerializationSchemaBuilderTest 
extends TestLogger {
 
     @Test
     public void testKafkaKeySerializerWrapperWithoutConfigurable() throws 
Exception {
-        final Map<String, String> config = ImmutableMap.of("simpleKey", 
"simpleValue");
+        final Map<String, String> config = 
Collections.singletonMap("simpleKey", "simpleValue");
         final KafkaRecordSerializationSchema<String> schema =
                 KafkaRecordSerializationSchema.builder()
                         .setTopic(DEFAULT_TOPIC)
@@ -179,7 +177,7 @@ public class KafkaRecordSerializationSchemaBuilderTest 
extends TestLogger {
 
     @Test
     public void testKafkaValueSerializerWrapperWithoutConfigurable() throws 
Exception {
-        final Map<String, String> config = ImmutableMap.of("simpleKey", 
"simpleValue");
+        final Map<String, String> config = 
Collections.singletonMap("simpleKey", "simpleValue");
         final KafkaRecordSerializationSchema<String> schema =
                 KafkaRecordSerializationSchema.builder()
                         .setTopic(DEFAULT_TOPIC)
@@ -193,7 +191,7 @@ public class KafkaRecordSerializationSchemaBuilderTest 
extends TestLogger {
 
     @Test
     public void testSerializeRecordWithKafkaSerializer() throws Exception {
-        final Map<String, String> config = ImmutableMap.of("configKey", 
"configValue");
+        final Map<String, String> config = 
Collections.singletonMap("configKey", "configValue");
         final KafkaRecordSerializationSchema<String> schema =
                 KafkaRecordSerializationSchema.builder()
                         .setTopic(DEFAULT_TOPIC)
@@ -261,7 +259,7 @@ public class KafkaRecordSerializationSchemaBuilderTest 
extends TestLogger {
                             KafkaRecordSerializationSchemaBuilder<String>,
                             KafkaRecordSerializationSchemaBuilder<String>>>
             valueSerializationSetter() {
-        return ImmutableList.of(
+        return Arrays.asList(
                 (b) -> b.setKafkaValueSerializer(StringSerializer.class),
                 (b) -> b.setValueSerializationSchema(new SimpleStringSchema()),
                 (b) ->
@@ -274,7 +272,7 @@ public class KafkaRecordSerializationSchemaBuilderTest 
extends TestLogger {
                             KafkaRecordSerializationSchemaBuilder<String>,
                             KafkaRecordSerializationSchemaBuilder<String>>>
             keySerializationSetter() {
-        return ImmutableList.of(
+        return Arrays.asList(
                 (b) -> b.setKafkaKeySerializer(StringSerializer.class),
                 (b) -> b.setKeySerializationSchema(new SimpleStringSchema()),
                 (b) ->
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 942902f0..fda9d6fa 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -60,8 +60,6 @@ import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
-
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -88,6 +86,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -624,7 +623,11 @@ public class KafkaSinkITCase extends TestLogger {
     }
 
     private String format(Map.Entry<Thread, StackTraceElement[]> leak) {
-        return leak.getKey().getName() + ":\n" + 
Joiner.on("\n").join(leak.getValue());
+        String stackTrace =
+                Arrays.stream(leak.getValue())
+                        .map(StackTraceElement::toString)
+                        .collect(Collectors.joining("\n"));
+        return leak.getKey().getName() + ":\n" + stackTrace;
     }
 
     private boolean findAliveKafkaThread(Map.Entry<Thread, 
StackTraceElement[]> threadStackTrace) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index c1b022dc..811ffa20 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -34,8 +34,6 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -60,6 +58,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -545,7 +544,7 @@ public class KafkaWriterITCase {
                 new SinkInitContext(sinkWriterMetricGroup, timeService, 
metadataConsumer),
                 new DummyRecordSerializer(),
                 new DummySchemaContext(),
-                ImmutableList.of());
+                Collections.emptyList());
     }
 
     private KafkaWriter<Integer> createWriterWithConfiguration(
@@ -557,7 +556,7 @@ public class KafkaWriterITCase {
                 sinkInitContext,
                 new DummyRecordSerializer(),
                 new DummySchemaContext(),
-                ImmutableList.of());
+                Collections.emptyList());
     }
 
     private static Properties getKafkaClientConfiguration() {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java
index 1b65c9d3..897a8591 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java
@@ -19,10 +19,10 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -32,18 +32,30 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class TransactionToAbortCheckerTest extends TestLogger {
 
     public static final String ABORT = "abort";
+    public static final String KEEP = "keep";
 
     @Test
     public void 
testMustAbortTransactionsWithSameSubtaskIdAndHigherCheckpointOffset() {
+        Map<Integer, Long> offsetMapping = new HashMap<>(2);
+        offsetMapping.put(0, 1L);
+        offsetMapping.put(2, 3L);
         final TransactionsToAbortChecker checker =
-                new TransactionsToAbortChecker(2, ImmutableMap.of(0, 1L, 2, 
3L), 0);
+                new TransactionsToAbortChecker(2, offsetMapping, 0);
 
         // abort recovered subtasksId with equal or higher checkpoint offset
-        final Map<Integer, Map<Long, String>> openTransactions =
-                ImmutableMap.of(
-                        0, ImmutableMap.of(2L, ABORT, 1L, ABORT),
-                        2, ImmutableMap.of(3L, ABORT, 4L, ABORT),
-                        3, ImmutableMap.of(3L, "keep", 4L, "keep"));
+        final Map<Integer, Map<Long, String>> openTransactions = new 
HashMap<>(3);
+        final Map<Long, String> subtask0 = new HashMap<>();
+        subtask0.put(1L, ABORT);
+        subtask0.put(2L, ABORT);
+        openTransactions.put(0, subtask0);
+        final Map<Long, String> subtask2 = new HashMap<>();
+        subtask2.put(3L, ABORT);
+        subtask2.put(4L, ABORT);
+        openTransactions.put(2, subtask2);
+        final Map<Long, String> subtask3 = new HashMap<>();
+        subtask3.put(3L, KEEP);
+        subtask3.put(4L, KEEP);
+        openTransactions.put(3, subtask3);
 
         final List<String> transactionsToAbort = 
checker.getTransactionsToAbort(openTransactions);
         assertThat(transactionsToAbort).hasSize(4);
@@ -53,16 +65,18 @@ public class TransactionToAbortCheckerTest extends 
TestLogger {
     @Test
     public void 
testMustAbortTransactionsIfLowestCheckpointOffsetIsMinimumOffset() {
         final TransactionsToAbortChecker checker =
-                new TransactionsToAbortChecker(2, ImmutableMap.of(0, 1L), 0);
+                new TransactionsToAbortChecker(2, Collections.singletonMap(0, 
1L), 0);
 
         // abort recovered subtasksId with equal or higher checkpoint offset
-        final Map<Integer, Map<Long, String>> openTransactions =
-                ImmutableMap.of(
-                        0, ImmutableMap.of(2L, ABORT, 1L, ABORT),
-                        2, ImmutableMap.of(1L, ABORT),
-                        3, ImmutableMap.of(1L, "keep"),
-                        4, ImmutableMap.of(1L, ABORT),
-                        5, ImmutableMap.of(1L, "keep"));
+        final Map<Integer, Map<Long, String>> openTransactions = new 
HashMap<>(5);
+        final Map<Long, String> subtask0 = new HashMap<>();
+        subtask0.put(1L, ABORT);
+        subtask0.put(2L, ABORT);
+        openTransactions.put(0, subtask0);
+        openTransactions.put(2, Collections.singletonMap(1L, ABORT));
+        openTransactions.put(3, Collections.singletonMap(1L, KEEP));
+        openTransactions.put(4, Collections.singletonMap(1L, ABORT));
+        openTransactions.put(5, Collections.singletonMap(1L, KEEP));
 
         final List<String> transactionsToAbort = 
checker.getTransactionsToAbort(openTransactions);
         assertThat(transactionsToAbort).hasSize(4);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index 8766719a..e764c860 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserialization
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -38,6 +37,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -116,7 +116,7 @@ public class KafkaRecordDeserializationSchemaTest {
 
     @Test
     public void testKafkaValueDeserializerWrapperWithoutConfigurable() throws 
Exception {
-        final Map<String, String> config = ImmutableMap.of("simpleKey", 
"simpleValue");
+        final Map<String, String> config = 
Collections.singletonMap("simpleKey", "simpleValue");
         KafkaRecordDeserializationSchema<String> schema =
                 
KafkaRecordDeserializationSchema.valueOnly(SimpleStringSerializer.class, 
config);
         schema.open(new TestingDeserializationContext());
@@ -127,7 +127,7 @@ public class KafkaRecordDeserializationSchemaTest {
 
     @Test
     public void testKafkaValueDeserializerWrapperWithConfigurable() throws 
Exception {
-        final Map<String, String> config = ImmutableMap.of("configKey", 
"configValue");
+        final Map<String, String> config = 
Collections.singletonMap("configKey", "configValue");
         KafkaRecordDeserializationSchema<String> schema =
                 KafkaRecordDeserializationSchema.valueOnly(
                         ConfigurableStringSerializer.class, config);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 2f393374..15729a8c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -20,8 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -35,6 +33,7 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -241,9 +240,11 @@ public class FlinkKafkaInternalProducerITCase extends 
KafkaTestBase {
                 records = kafkaConsumer.poll(Duration.ofMillis(10000));
             }
 
-            ConsumerRecord<String, String> record = 
Iterables.getOnlyElement(records);
+            final Iterator<ConsumerRecord<String, String>> it = 
records.iterator();
+            ConsumerRecord<String, String> record = it.next();
             assertThat(record.key()).isEqualTo(expectedKey);
             assertThat(record.value()).isEqualTo(expectedValue);
+            assertThat(it.hasNext()).isFalse();
         }
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index a8596ee7..88f6ac60 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -80,8 +80,6 @@ import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -104,6 +102,7 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -258,7 +257,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
         } while (System.nanoTime() < deadline);
 
         // cancel the job & wait for the job to finish
-        client.cancel(Iterables.getOnlyElement(getRunningJobs(client))).get();
+        final Iterator<JobID> it = getRunningJobs(client).iterator();
+        final JobID jobId = it.next();
+        client.cancel(jobId).get();
+        assertThat(it.hasNext()).isFalse();
         runner.join();
 
         final Throwable t = errorRef.get();
@@ -349,7 +351,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
         } while (System.nanoTime() < deadline);
 
         // cancel the job & wait for the job to finish
-        client.cancel(Iterables.getOnlyElement(getRunningJobs(client))).get();
+        final Iterator<JobID> it = getRunningJobs(client).iterator();
+        final JobID jobId = it.next();
+        client.cancel(jobId).get();
+        assertThat(it.hasNext()).isFalse();
         runner.join();
 
         final Throwable t = errorRef.get();
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
index 9a0a14da..5505bdde 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
@@ -34,9 +34,6 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher.KafkaShuffleWatermark;
 import org.apache.flink.util.PropertiesUtil;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Rule;
 import org.junit.Test;
@@ -374,17 +371,16 @@ public class KafkaShuffleITCase extends 
KafkaShuffleTestBase {
 
         // Records in a single partition are kept in order
         Collection<ConsumerRecord<byte[], byte[]>> records =
-                Iterables.getOnlyElement(
-                        testKafkaShuffleProducer(
-                                        topic(
-                                                "test_serde-" + 
UUID.randomUUID(),
-                                                timeCharacteristic),
-                                        env,
-                                        1,
-                                        1,
-                                        numElementsPerProducer,
-                                        timeCharacteristic)
-                                .values());
+                testKafkaShuffleProducer(
+                                topic("test_serde-" + UUID.randomUUID(), 
timeCharacteristic),
+                                env,
+                                1,
+                                1,
+                                numElementsPerProducer,
+                                timeCharacteristic)
+                        .values()
+                        .iterator()
+                        .next();
 
         switch (timeCharacteristic) {
             case ProcessingTime:
@@ -516,7 +512,7 @@ public class KafkaShuffleITCase extends 
KafkaShuffleTestBase {
                         r -> {
                             final int partition = r.partition();
                             if (!results.containsKey(partition)) {
-                                results.put(partition, Lists.newArrayList());
+                                results.put(partition, new ArrayList<>());
                             }
                             results.get(partition).add(r);
                         });
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index a0b74a5b..0a4182bd 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -73,8 +73,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLoggerExtension;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
@@ -688,7 +686,7 @@ public class KafkaDynamicTableFactoryTest {
 
     @Test
     public void testTableSinkSemanticTranslation() {
-        final List<String> semantics = ImmutableList.of("exactly-once", 
"at-least-once", "none");
+        final List<String> semantics = Arrays.asList("exactly-once", 
"at-least-once", "none");
         final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
                 new EncodingFormatMock(",");
         for (final String semantic : semantics) {
diff --git a/pom.xml b/pom.xml
index 2302852d..c7db3228 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,12 +77,6 @@ under the License.
     </properties>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-shaded-guava</artifactId>
-            <version>30.1.1-jre-16.1</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>


Reply via email to