This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 84364dd  Convert anonymous classes to lambda (#4703)
84364dd is described below

commit 84364ddc10394f7361083a3505b34f0dc5710316
Author: vzhikserg <[email protected]>
AuthorDate: Mon Jul 22 01:28:29 2019 +0200

    Convert anonymous classes to lambda (#4703)
    
    * Convert anonymous functions to lambda
    
    * Replacing lambda with anonymous implementation, because lambda cannot be 
mocked
---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 52 +++++++---------------
 .../bookkeeper/mledger/util/CallbackMutexTest.java | 15 +++----
 .../java/org/apache/zookeeper/MockZooKeeper.java   |  6 +--
 .../broker/loadbalance/LeaderElectionService.java  | 20 +++------
 .../pulsar/broker/service/BrokerService.java       |  7 +--
 .../client/api/SimpleProducerConsumerTest.java     | 33 +++++---------
 .../worker/PulsarFunctionLocalRunTest.java         |  6 +--
 .../worker/PulsarFunctionPublishTest.java          | 12 +----
 .../functions/worker/PulsarFunctionStateTest.java  | 12 +----
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 32 +++----------
 .../apache/pulsar/common/nar/NarClassLoader.java   |  9 ++--
 .../org/apache/pulsar/common/util/RateLimiter.java |  7 +--
 .../collections/ConcurrentLongHashMapTest.java     |  6 +--
 .../instance/stats/ComponentStatsManager.java      | 13 +++---
 .../pulsar/functions/instance/ContextImplTest.java |  7 +--
 .../pulsar/io/canal/CanalAbstractSource.java       | 17 +------
 .../PollingZooKeeperConfigurationProvider.java     |  7 +--
 .../pulsar/io/flume/source/AbstractSource.java     | 17 +------
 .../pulsar/io/hdfs2/AbstractHdfsConnector.java     |  7 +--
 .../pulsar/io/hdfs3/AbstractHdfsConnector.java     |  7 +--
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    | 18 +++-----
 .../apache/pulsar/sql/presto/PulsarMetadata.java   | 52 +++++++++-------------
 .../pulsar/testclient/PerformanceConsumer.java     | 10 ++---
 .../pulsar/testclient/PerformanceProducer.java     | 10 ++---
 .../broker/zookeeper/aspectj/ClientCnxnAspect.java |  7 +--
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 16 ++-----
 .../tests/integration/cli/AdminMultiHostTest.java  | 13 +++---
 .../spark/SparkStreamingPulsarReceiverTest.java    |  6 +--
 28 files changed, 120 insertions(+), 304 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4f27611..baaa6f6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1294,31 +1294,25 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         final Position position = ledger.addEntry("entry-0".getBytes());
         Executor executor = Executors.newCachedThreadPool();
         final CountDownLatch counter = new CountDownLatch(2);
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    for (int i = 0; i < N; i++) {
-                        c1.markDelete(position);
-                    }
-                    counter.countDown();
-                } catch (Exception e) {
-                    e.printStackTrace();
+        executor.execute(() -> {
+            try {
+                for (int i = 0; i < N; i++) {
+                    c1.markDelete(position);
                 }
+                counter.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         });
 
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    for (int i = 0; i < N; i++) {
-                        ledger.openCursor("cursor-" + i);
-                    }
-                    counter.countDown();
-                } catch (Exception e) {
-                    e.printStackTrace();
+        executor.execute(() -> {
+            try {
+                for (int i = 0; i < N; i++) {
+                    ledger.openCursor("cursor-" + i);
                 }
+                counter.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         });
 
@@ -1832,12 +1826,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerImpl newVersionLedger = (ManagedLedgerImpl) 
factory.open("backward_test_ledger", conf);
         List<LedgerInfo> mlInfo = newVersionLedger.getLedgersInfoAsList();
 
-        assertTrue(mlInfo.stream().allMatch(new Predicate<LedgerInfo>() {
-            @Override
-            public boolean test(LedgerInfo ledgerInfo) {
-                return ledgerInfo.hasTimestamp();
-            }
-        }));
+        assertTrue(mlInfo.stream().allMatch(ledgerInfo -> 
ledgerInfo.hasTimestamp()));
     }
 
     @Test
@@ -2288,15 +2277,8 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                 responseException1.set(exception);
             }
         }, ctxStr);
-        ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
-            @Override
-            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-
-            }
-        }, Collections.emptyMap());
-        retryStrategically((test) -> {
-            return responseException1.get() != null;
-        }, 5, 1000);
+        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, 
Collections.emptyMap());
+        retryStrategically((test) -> responseException1.get() != null, 5, 
1000);
         assertNotNull(responseException1.get());
         assertEquals(responseException1.get().getMessage(), 
BKException.getMessage(BKException.Code.TimeoutException));
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
index 599458d..3d20c0f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
@@ -35,15 +35,12 @@ public class CallbackMutexTest {
         salary.add(1000);
         // No thread competition here
         // We will test thread competition in unlock()
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                cbm.lock();
-                if (salary.value() == 1000)
-                    salary.add(2000);
-                cbm.unlock();
-                Assert.assertEquals(salary.value(), 3000);
-            }
+        new Thread(() -> {
+            cbm.lock();
+            if (salary.value() == 1000)
+                salary.add(2000);
+            cbm.unlock();
+            Assert.assertEquals(salary.value(), 3000);
         }).start();
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java 
b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index 8c5202e..0afbcd6 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -106,11 +106,7 @@ public class MockZooKeeper extends ZooKeeper {
 
     private MockZooKeeper(String quorum) throws Exception {
         // This constructor is never called
-        super(quorum, 1, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-            }
-        });
+        super(quorum, 1, event -> {});
         assert false;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 8e96db4..eb9747f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -99,14 +99,11 @@ public class LeaderElectionService {
                         log.warn("Election node {} is deleted, attempting 
re-election...", event.getPath());
                         if (event.getPath().equals(ELECTION_ROOT)) {
                             log.info("This should call elect again...");
-                            executor.execute(new Runnable() {
-                                @Override
-                                public void run() {
-                                    // If the node is deleted, attempt the 
re-election
-                                    log.info("Broker [{}] is calling 
re-election from the thread",
-                                            pulsar.getSafeWebServiceAddress());
-                                    elect();
-                                }
+                            executor.execute(() -> {
+                                // If the node is deleted, attempt the 
re-election
+                                log.info("Broker [{}] is calling re-election 
from the thread",
+                                        pulsar.getSafeWebServiceAddress());
+                                elect();
                             });
                         }
                         break;
@@ -148,12 +145,7 @@ public class LeaderElectionService {
                 log.warn(
                         "Got exception [{}] while creating election node 
because it already exists. Attempting re-election...",
                         nee.getMessage());
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        elect();
-                    }
-                });
+                executor.execute(this::elect);
             } catch (Exception e) {
                 // Kill the broker because this broker's session with 
zookeeper might be stale. Killing the broker will
                 // make sure that we get the fresh zookeeper session.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1da2cf8..ba585b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -275,12 +275,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         }
 
         // register listener to capture zk-latency
-        zkStatsListener = new EventListner() {
-            @Override
-            public void recordLatency(EventType eventType, long latencyMs) {
-                pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
-            }
-        };
+        zkStatsListener = (eventType, latencyMs) -> 
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
 
         this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader
                 .loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 23b6f80..d01e6ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -671,13 +671,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 
1);
         for (int i = 0; i < numConsumersThreads; i++) {
-            executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    barrier.await();
-                    consumer.receive();
-                    return null;
-                }
+            executor.submit((Callable<Void>) () -> {
+                barrier.await();
+                consumer.receive();
+                return null;
             });
         }
 
@@ -712,13 +709,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         barrier.reset();
         for (int i = 0; i < numConsumersThreads; i++) {
-            executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    barrier.await();
-                    consumer.receive();
-                    return null;
-                }
+            executor.submit((Callable<Void>) () -> {
+                barrier.await();
+                consumer.receive();
+                return null;
             });
         }
         barrier.await();
@@ -742,13 +736,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         barrier.reset();
         for (int i = 0; i < numConsumersThreads; i++) {
-            executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    barrier.await();
-                    consumer.receive();
-                    return null;
-                }
+            executor.submit((Callable<Void>) () -> {
+                barrier.await();
+                consumer.receive();
+                return null;
             });
         }
         barrier.await();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index db7dbf2..ecf6567 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -136,11 +136,7 @@ public class PulsarFunctionLocalRunTest {
 
         // delete all function temp files
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((ignoredDir, name) -> 
name.startsWith("function"));
 
         for (File file : foundFiles) {
             file.delete();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index afc7cf7..d8ecce3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -123,11 +123,7 @@ public class PulsarFunctionPublishTest {
 
         // delete all function temp files
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((ignoredDir, name) -> 
name.startsWith("function"));
 
         for (File file : foundFiles) {
             file.delete();
@@ -376,11 +372,7 @@ public class PulsarFunctionPublishTest {
 
         // make sure all temp files are deleted
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
index 0b5c340..e49b8e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -146,11 +146,7 @@ public class PulsarFunctionStateTest {
 
         // delete all function temp files
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((ignoredDir, name) -> 
name.startsWith("function"));
 
         for (File file : foundFiles) {
             file.delete();
@@ -411,11 +407,7 @@ public class PulsarFunctionStateTest {
 
         // make sure all temp files are deleted
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 9ae5384..9869f38 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -149,11 +149,7 @@ public class PulsarFunctionE2ETest {
 
         // delete all function temp files
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         for (File file : foundFiles) {
             file.delete();
@@ -162,7 +158,7 @@ public class PulsarFunctionE2ETest {
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
-        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> 
PortManager.nextFreePort());
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager::nextFreePort);
         bkEnsemble.start();
 
         String brokerServiceUrl = "https://127.0.0.1:"; + 
brokerWebServiceTlsPort;
@@ -492,11 +488,7 @@ public class PulsarFunctionE2ETest {
 
         // make sure all temp files are deleted
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
     }
@@ -726,11 +718,7 @@ public class PulsarFunctionE2ETest {
 
         // make sure all temp files are deleted
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
     }
@@ -869,11 +857,7 @@ public class PulsarFunctionE2ETest {
 
         // make sure all temp files are deleted
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
     }
@@ -1221,11 +1205,7 @@ public class PulsarFunctionE2ETest {
 
         // make sure all temp files are deleted
         File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                return name.startsWith("function");
-            }
-        });
+        File[] foundFiles = dir.listFiles((dir1, name) -> 
name.startsWith("function"));
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 0b78344..b375787 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -124,12 +124,9 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class NarClassLoader extends URLClassLoader {
 
-    private static final FileFilter JAR_FILTER = new FileFilter() {
-        @Override
-        public boolean accept(File pathname) {
-            final String nameToTest = pathname.getName().toLowerCase();
-            return nameToTest.endsWith(".jar") && pathname.isFile();
-        }
+    private static final FileFilter JAR_FILTER = pathname -> {
+        final String nameToTest = pathname.getName().toLowerCase();
+        return nameToTest.endsWith(".jar") && pathname.isFile();
     };
 
     /**
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 177d240..e2aacdd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -239,12 +239,7 @@ public class RateLimiter implements AutoCloseable{
     }
 
     protected ScheduledFuture<?> createTask() {
-        return executorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                renew();
-            }
-        }, this.rateTime, this.rateTime, this.timeUnit);
+        return executorService.scheduleAtFixedRate(this::renew, this.rateTime, 
this.rateTime, this.timeUnit);
     }
 
     synchronized void renew() {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index e4cbd47..5a9f49b 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -376,11 +376,7 @@ public class ConcurrentLongHashMapTest {
     public void testComputeIfAbsent() {
         ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 
1);
         AtomicInteger counter = new AtomicInteger();
-        LongFunction<Integer> provider = new LongFunction<Integer>() {
-            public Integer apply(long key) {
-                return counter.getAndIncrement();
-            }
-        };
+        LongFunction<Integer> provider = key -> counter.getAndIncrement();
 
         assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
         assertEquals(map.get(0).intValue(), 0);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 716fded..daa51b7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -77,14 +77,11 @@ public abstract class ComponentStatsManager implements 
AutoCloseable {
         this.collectorRegistry = collectorRegistry;
         this.metricsLabels = metricsLabels;
 
-        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
-            @Override
-            public void run() {
-                try {
-                    reset();
-                } catch (Exception e) {
-                    log.error("Failed to reset metrics for 1min window", e);
-                }
+        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                reset();
+            } catch (Exception e) {
+                log.error("Failed to reset metrics for 1min window", e);
             }
         }, 1, 1, TimeUnit.MINUTES);
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 8ababe4..bfe545a 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -88,12 +88,7 @@ public class ContextImplTest {
             client,
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), 
new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null);
-        context.setCurrentMessageContext(new Record<String>() {
-            @Override
-            public String getValue() {
-                return null;
-            }
-        });
+        context.setCurrentMessageContext((Record<String>) () -> null);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
diff --git 
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
 
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index c1bcb3d..65bcc86 100644
--- 
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
+++ 
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -53,13 +53,7 @@ public abstract class CanalAbstractSource<V> extends 
PushSource<V> {
 
     private static final String DESTINATION = "destination";
 
-    protected final Thread.UncaughtExceptionHandler handler = new 
Thread.UncaughtExceptionHandler() {
-
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-            log.error("[{}] parse events has an error", t.getName(), e);
-        }
-    };
+    protected final Thread.UncaughtExceptionHandler handler = (t, e) -> 
log.error("[{}] parse events has an error", t.getName(), e);
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
@@ -82,14 +76,7 @@ public abstract class CanalAbstractSource<V> extends 
PushSource<V> {
 
     protected void start() {
         Objects.requireNonNull(connector, "connector is null");
-        thread = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                process();
-            }
-        });
-
+        thread = new Thread(this::process);
         thread.setName("canal source thread");
         thread.setUncaughtExceptionHandler(handler);
         running = true;
diff --git 
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
 
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
index b80eed3..775225c 100644
--- 
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
+++ 
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
@@ -72,12 +72,7 @@ public class PollingZooKeeperConfigurationProvider extends
             try {
                 agentNodeCache = new NodeCache(client, basePath + "/" + 
getAgentName());
                 agentNodeCache.start();
-                agentNodeCache.getListenable().addListener(new 
NodeCacheListener() {
-                    @Override
-                    public void nodeChanged() throws Exception {
-                        refreshConfiguration();
-                    }
-                });
+                agentNodeCache.getListenable().addListener(() -> 
refreshConfiguration());
             } catch (Exception e) {
                 client.close();
                 throw e;
diff --git 
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
 
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
index 8c80512..fcc4c78 100644
--- 
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
+++ 
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
@@ -46,13 +46,7 @@ public abstract class AbstractSource<V> extends 
PushSource<V> {
 
     protected volatile boolean running = false;
 
-    protected final Thread.UncaughtExceptionHandler handler = new 
Thread.UncaughtExceptionHandler() {
-
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-            log.error("[{}] parse events has an error", t.getName(), e);
-        }
-    };
+    protected final Thread.UncaughtExceptionHandler handler = (t, e) -> 
log.error("[{}] parse events has an error", t.getName(), e);
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
@@ -69,14 +63,7 @@ public abstract class AbstractSource<V> extends 
PushSource<V> {
     public abstract V extractValue(String message);
 
     protected void start() {
-        thread = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                process();
-            }
-        });
-
+        thread = new Thread(this::process);
         thread.setName("flume source thread");
         thread.setUncaughtExceptionHandler(handler);
         running = true;
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
index 163fe22..3f57dc4 100644
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++ 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
@@ -163,12 +163,7 @@ public abstract class AbstractHdfsConnector {
 
     protected FileSystem getFileSystemAsUser(final Configuration config, 
UserGroupInformation ugi) throws IOException {
         try {
-            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-                @Override
-                public FileSystem run() throws Exception {
-                    return FileSystem.get(config);
-                }
-            });
+            return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> 
FileSystem.get(config));
         } catch (InterruptedException e) {
             throw new IOException("Unable to create file system: " + 
e.getMessage());
         }
diff --git 
a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
 
b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
index aa22b42..f939654 100644
--- 
a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
+++ 
b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
@@ -163,12 +163,7 @@ public abstract class AbstractHdfsConnector {
 
     protected FileSystem getFileSystemAsUser(final Configuration config, 
UserGroupInformation ugi) throws IOException {
         try {
-            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-                @Override
-                public FileSystem run() throws Exception {
-                    return FileSystem.get(config);
-                }
-            });
+            return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> 
FileSystem.get(config));
         } catch (InterruptedException e) {
             throw new IOException("Unable to create file system: " + 
e.getMessage());
         }
diff --git 
a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java 
b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 583c474..ce4aae4 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -124,10 +124,8 @@ public class JdbcSinkTest {
 
         Record<GenericRecord> insertRecord = 
PulsarRecord.<GenericRecord>builder()
             .message(insertMessage)
-            .topicName("fake_topic_name").ackFunction(new Runnable(){
-                    public void run(){
-                    }
-                })
+            .topicName("fake_topic_name")
+            .ackFunction(() -> {})
             .build();
 
         genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
@@ -173,10 +171,8 @@ public class JdbcSinkTest {
         Message<GenericRecord> updateMessage = mock(MessageImpl.class);
         Record<GenericRecord> updateRecord = 
PulsarRecord.<GenericRecord>builder()
                 .message(updateMessage)
-                .topicName("fake_topic_name").ackFunction(new Runnable(){
-                    public void run(){
-                    }
-                })
+                .topicName("fake_topic_name")
+                .ackFunction(() -> {})
                 .build();
 
         GenericSchema<GenericRecord> updateGenericAvroSchema;
@@ -216,10 +212,8 @@ public class JdbcSinkTest {
         Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
         Record<GenericRecord> deleteRecord = 
PulsarRecord.<GenericRecord>builder()
                 .message(deleteMessage)
-                .topicName("fake_topic_name").ackFunction(new Runnable(){
-                    public void run(){
-                    }
-                })
+                .topicName("fake_topic_name")
+                .ackFunction(() -> {})
                 .build();
 
         GenericSchema<GenericRecord> deleteGenericAvroSchema = new 
GenericAvroSchema(schema.getSchemaInfo());
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 19cfe5f..cce44b2 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -186,33 +186,27 @@ public class PulsarMetadata implements ConnectorMetadata {
 
         ImmutableMap.Builder<String, ColumnHandle> columnHandles = 
ImmutableMap.builder();
 
-        tableMetaData.getColumns().forEach(new Consumer<ColumnMetadata>() {
-            @Override
-            public void accept(ColumnMetadata columnMetadata) {
-
-                PulsarColumnMetadata pulsarColumnMetadata = 
(PulsarColumnMetadata) columnMetadata;
-
-                PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
-                        connectorId,
-                        pulsarColumnMetadata.getNameWithCase(),
-                        pulsarColumnMetadata.getType(),
-                        pulsarColumnMetadata.isHidden(),
-                        pulsarColumnMetadata.isInternal(),
-                        pulsarColumnMetadata.getFieldNames(),
-                        pulsarColumnMetadata.getPositionIndices());
-
-                columnHandles.put(
-                        columnMetadata.getName(),
-                        pulsarColumnHandle);
-            }
+        tableMetaData.getColumns().forEach(columnMetadata -> {
+
+            PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) 
columnMetadata;
+
+            PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
+                    connectorId,
+                    pulsarColumnMetadata.getNameWithCase(),
+                    pulsarColumnMetadata.getType(),
+                    pulsarColumnMetadata.isHidden(),
+                    pulsarColumnMetadata.isInternal(),
+                    pulsarColumnMetadata.getFieldNames(),
+                    pulsarColumnMetadata.getPositionIndices());
+
+            columnHandles.put(
+                    columnMetadata.getName(),
+                    pulsarColumnHandle);
         });
 
-        PulsarInternalColumn.getInternalFields().stream().forEach(new 
Consumer<PulsarInternalColumn>() {
-            @Override
-            public void accept(PulsarInternalColumn pulsarInternalColumn) {
-                PulsarColumnHandle pulsarColumnHandle = 
pulsarInternalColumn.getColumnHandle(connectorId, false);
-                columnHandles.put(pulsarColumnHandle.getName(), 
pulsarColumnHandle);
-            }
+        PulsarInternalColumn.getInternalFields().forEach(pulsarInternalColumn 
-> {
+            PulsarColumnHandle pulsarColumnHandle = 
pulsarInternalColumn.getColumnHandle(connectorId, false);
+            columnHandles.put(pulsarColumnHandle.getName(), 
pulsarColumnHandle);
         });
 
         return columnHandles.build();
@@ -314,12 +308,8 @@ public class PulsarMetadata implements ConnectorMetadata {
         builder.addAll(getColumns(null, schema, new HashSet<>(), new 
Stack<>(), new Stack<>()));
 
         if (withInternalColumns) {
-            PulsarInternalColumn.getInternalFields().stream().forEach(new 
Consumer<PulsarInternalColumn>() {
-                @Override
-                public void accept(PulsarInternalColumn pulsarInternalColumn) {
-                    builder.add(pulsarInternalColumn.getColumnMetadata(false));
-                }
-            });
+            PulsarInternalColumn.getInternalFields().forEach(
+                pulsarInternalColumn -> 
builder.add(pulsarInternalColumn.getColumnMetadata(false)));
         }
 
         return new ConnectorTableMetadata(schemaTableName, builder.build());
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 800fc80..5c569fd 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -292,12 +292,10 @@ public class PerformanceConsumer {
 
         long start = System.nanoTime();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                printAggregatedThroughput(start);
-                printAggregatedStats();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
 
 
         long oldTime = System.nanoTime();
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 922f498..1430e57 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -336,12 +336,10 @@ public class PerformanceProducer {
 
         long start = System.nanoTime();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                printAggregatedThroughput(start);
-                printAggregatedStats();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
 
         Collections.shuffle(producers);
         AtomicBoolean isDone = new AtomicBoolean();
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
index 33bd11c..e6f693c 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
@@ -74,12 +74,7 @@ public class ClientCnxnAspect {
         // zkResponse event shouldn't be blocked and it should be processed
         // async
         if (eventProcessExecutor != null && 
!eventProcessExecutor.isShutdown()) {
-            eventProcessExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    processEvent(joinPoint);
-                }
-            });
+            eventProcessExecutor.submit(() -> processEvent(joinPoint));
         }
     }
 
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 7325e6c..6136481 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -204,12 +204,7 @@ public abstract class ZooKeeperCache implements Watcher {
 
     private boolean exists(final String path, Watcher watcher) throws 
KeeperException, InterruptedException {
         try {
-            return existsCache.get(path, new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    return zkSession.get().exists(path, watcher) != null;
-                }
-            });
+            return existsCache.get(path, () -> zkSession.get().exists(path, 
watcher) != null);
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
             if (cause instanceof KeeperException) {
@@ -386,12 +381,9 @@ public abstract class ZooKeeperCache implements Watcher {
     public Set<String> getChildren(final String path, final Watcher watcher)
             throws KeeperException, InterruptedException {
         try {
-            return childrenCache.get(path, new Callable<Set<String>>() {
-                @Override
-                public Set<String> call() throws Exception {
-                    LOG.debug("Fetching children at {}", path);
-                    return 
Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
-                }
+            return childrenCache.get(path, () -> {
+                LOG.debug("Fetching children at {}", path);
+                return 
Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
             });
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
index db7755b..8103143 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
@@ -80,15 +80,12 @@ public class AdminMultiHostTest {
     // Because zookeeper session timeout is 30ms and ticktime is 2ms, so we 
need wait more than 32ms
     private void waitBrokerDown(PulsarAdmin admin, int expectBrokers, int 
timeout)
         throws InterruptedException, ExecutionException, TimeoutException {
-        FutureTask<Boolean> futureTask = new FutureTask<>(new 
Callable<Boolean>() {
-            @Override
-            public Boolean call() throws Exception {
-                while (admin.brokers().getActiveBrokers(clusterName).size() != 
expectBrokers) {
-                    admin.brokers().healthcheck();
-                    TimeUnit.MILLISECONDS.sleep(1000);
-                }
-                return true;
+        FutureTask<Boolean> futureTask = new FutureTask<>(() -> {
+            while (admin.brokers().getActiveBrokers(clusterName).size() != 
expectBrokers) {
+                admin.brokers().healthcheck();
+                TimeUnit.MILLISECONDS.sleep(1000);
             }
+            return true;
         });
         new Thread(futureTask).start();
         futureTask.get(timeout, TimeUnit.SECONDS);
diff --git 
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
 
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index 6f2ac5e..7504948 100644
--- 
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ 
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -44,7 +44,7 @@ public class SparkStreamingPulsarReceiverTest extends 
PulsarTestSuite {
 
     @Test(dataProvider = "ServiceUrls")
     public void testReceivedMessage(String serviceUrl) throws Exception {
-        ConsumerConfigurationData<byte[]> consConf = new 
ConsumerConfigurationData();
+        ConsumerConfigurationData<byte[]> consConf = new 
ConsumerConfigurationData<>();
 
         Set<String> set = new HashSet<>();
         set.add(TOPIC);
@@ -80,8 +80,8 @@ public class SparkStreamingPulsarReceiverTest extends 
PulsarTestSuite {
     }
 
     @Test(dataProvider = "ServiceUrls")
-    public void testDefaultSettingsOfReceiver(String serviceUrl) throws 
Exception {
-        ConsumerConfigurationData<byte[]> consConf = new 
ConsumerConfigurationData();
+    public void testDefaultSettingsOfReceiver(String serviceUrl) {
+        ConsumerConfigurationData<byte[]> consConf = new 
ConsumerConfigurationData<>();
 
         Set<String> set = new HashSet<>();
         set.add(TOPIC);

Reply via email to