This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 84364dd Convert anonymous classes to lambda (#4703)
84364dd is described below
commit 84364ddc10394f7361083a3505b34f0dc5710316
Author: vzhikserg <[email protected]>
AuthorDate: Mon Jul 22 01:28:29 2019 +0200
Convert anonymous classes to lambda (#4703)
* Convert anonymous functions to lambda
* Replacing lambda with anonymous implementation, because lambda cannot be
mocked
---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 52 +++++++---------------
.../bookkeeper/mledger/util/CallbackMutexTest.java | 15 +++----
.../java/org/apache/zookeeper/MockZooKeeper.java | 6 +--
.../broker/loadbalance/LeaderElectionService.java | 20 +++------
.../pulsar/broker/service/BrokerService.java | 7 +--
.../client/api/SimpleProducerConsumerTest.java | 33 +++++---------
.../worker/PulsarFunctionLocalRunTest.java | 6 +--
.../worker/PulsarFunctionPublishTest.java | 12 +----
.../functions/worker/PulsarFunctionStateTest.java | 12 +----
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 32 +++----------
.../apache/pulsar/common/nar/NarClassLoader.java | 9 ++--
.../org/apache/pulsar/common/util/RateLimiter.java | 7 +--
.../collections/ConcurrentLongHashMapTest.java | 6 +--
.../instance/stats/ComponentStatsManager.java | 13 +++---
.../pulsar/functions/instance/ContextImplTest.java | 7 +--
.../pulsar/io/canal/CanalAbstractSource.java | 17 +------
.../PollingZooKeeperConfigurationProvider.java | 7 +--
.../pulsar/io/flume/source/AbstractSource.java | 17 +------
.../pulsar/io/hdfs2/AbstractHdfsConnector.java | 7 +--
.../pulsar/io/hdfs3/AbstractHdfsConnector.java | 7 +--
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 18 +++-----
.../apache/pulsar/sql/presto/PulsarMetadata.java | 52 +++++++++-------------
.../pulsar/testclient/PerformanceConsumer.java | 10 ++---
.../pulsar/testclient/PerformanceProducer.java | 10 ++---
.../broker/zookeeper/aspectj/ClientCnxnAspect.java | 7 +--
.../apache/pulsar/zookeeper/ZooKeeperCache.java | 16 ++-----
.../tests/integration/cli/AdminMultiHostTest.java | 13 +++---
.../spark/SparkStreamingPulsarReceiverTest.java | 6 +--
28 files changed, 120 insertions(+), 304 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4f27611..baaa6f6 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1294,31 +1294,25 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
final Position position = ledger.addEntry("entry-0".getBytes());
Executor executor = Executors.newCachedThreadPool();
final CountDownLatch counter = new CountDownLatch(2);
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < N; i++) {
- c1.markDelete(position);
- }
- counter.countDown();
- } catch (Exception e) {
- e.printStackTrace();
+ executor.execute(() -> {
+ try {
+ for (int i = 0; i < N; i++) {
+ c1.markDelete(position);
}
+ counter.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
}
});
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < N; i++) {
- ledger.openCursor("cursor-" + i);
- }
- counter.countDown();
- } catch (Exception e) {
- e.printStackTrace();
+ executor.execute(() -> {
+ try {
+ for (int i = 0; i < N; i++) {
+ ledger.openCursor("cursor-" + i);
}
+ counter.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
}
});
@@ -1832,12 +1826,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerImpl newVersionLedger = (ManagedLedgerImpl)
factory.open("backward_test_ledger", conf);
List<LedgerInfo> mlInfo = newVersionLedger.getLedgersInfoAsList();
- assertTrue(mlInfo.stream().allMatch(new Predicate<LedgerInfo>() {
- @Override
- public boolean test(LedgerInfo ledgerInfo) {
- return ledgerInfo.hasTimestamp();
- }
- }));
+ assertTrue(mlInfo.stream().allMatch(ledgerInfo ->
ledgerInfo.hasTimestamp()));
}
@Test
@@ -2288,15 +2277,8 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
responseException1.set(exception);
}
}, ctxStr);
- ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
- @Override
- public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-
- }
- }, Collections.emptyMap());
- retryStrategically((test) -> {
- return responseException1.get() != null;
- }, 5, 1000);
+ ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {},
Collections.emptyMap());
+ retryStrategically((test) -> responseException1.get() != null, 5,
1000);
assertNotNull(responseException1.get());
assertEquals(responseException1.get().getMessage(),
BKException.getMessage(BKException.Code.TimeoutException));
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
index 599458d..3d20c0f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java
@@ -35,15 +35,12 @@ public class CallbackMutexTest {
salary.add(1000);
// No thread competition here
// We will test thread competition in unlock()
- new Thread(new Runnable() {
- @Override
- public void run() {
- cbm.lock();
- if (salary.value() == 1000)
- salary.add(2000);
- cbm.unlock();
- Assert.assertEquals(salary.value(), 3000);
- }
+ new Thread(() -> {
+ cbm.lock();
+ if (salary.value() == 1000)
+ salary.add(2000);
+ cbm.unlock();
+ Assert.assertEquals(salary.value(), 3000);
}).start();
}
diff --git
a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index 8c5202e..0afbcd6 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -106,11 +106,7 @@ public class MockZooKeeper extends ZooKeeper {
private MockZooKeeper(String quorum) throws Exception {
// This constructor is never called
- super(quorum, 1, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- }
- });
+ super(quorum, 1, event -> {});
assert false;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 8e96db4..eb9747f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -99,14 +99,11 @@ public class LeaderElectionService {
log.warn("Election node {} is deleted, attempting
re-election...", event.getPath());
if (event.getPath().equals(ELECTION_ROOT)) {
log.info("This should call elect again...");
- executor.execute(new Runnable() {
- @Override
- public void run() {
- // If the node is deleted, attempt the
re-election
- log.info("Broker [{}] is calling
re-election from the thread",
- pulsar.getSafeWebServiceAddress());
- elect();
- }
+ executor.execute(() -> {
+ // If the node is deleted, attempt the
re-election
+ log.info("Broker [{}] is calling re-election
from the thread",
+ pulsar.getSafeWebServiceAddress());
+ elect();
});
}
break;
@@ -148,12 +145,7 @@ public class LeaderElectionService {
log.warn(
"Got exception [{}] while creating election node
because it already exists. Attempting re-election...",
nee.getMessage());
- executor.execute(new Runnable() {
- @Override
- public void run() {
- elect();
- }
- });
+ executor.execute(this::elect);
} catch (Exception e) {
// Kill the broker because this broker's session with
zookeeper might be stale. Killing the broker will
// make sure that we get the fresh zookeeper session.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1da2cf8..ba585b9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -275,12 +275,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
// register listener to capture zk-latency
- zkStatsListener = new EventListner() {
- @Override
- public void recordLatency(EventType eventType, long latencyMs) {
- pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
- }
- };
+ zkStatsListener = (eventType, latencyMs) ->
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader
.loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 23b6f80..d01e6ac 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -671,13 +671,10 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads +
1);
for (int i = 0; i < numConsumersThreads; i++) {
- executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- barrier.await();
- consumer.receive();
- return null;
- }
+ executor.submit((Callable<Void>) () -> {
+ barrier.await();
+ consumer.receive();
+ return null;
});
}
@@ -712,13 +709,10 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
- executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- barrier.await();
- consumer.receive();
- return null;
- }
+ executor.submit((Callable<Void>) () -> {
+ barrier.await();
+ consumer.receive();
+ return null;
});
}
barrier.await();
@@ -742,13 +736,10 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
- executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- barrier.await();
- consumer.receive();
- return null;
- }
+ executor.submit((Callable<Void>) () -> {
+ barrier.await();
+ consumer.receive();
+ return null;
});
}
barrier.await();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index db7dbf2..ecf6567 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -136,11 +136,7 @@ public class PulsarFunctionLocalRunTest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((ignoredDir, name) ->
name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index afc7cf7..d8ecce3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -123,11 +123,7 @@ public class PulsarFunctionPublishTest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((ignoredDir, name) ->
name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
@@ -376,11 +372,7 @@ public class PulsarFunctionPublishTest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
index 0b5c340..e49b8e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -146,11 +146,7 @@ public class PulsarFunctionStateTest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((ignoredDir, name) ->
name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
@@ -411,11 +407,7 @@ public class PulsarFunctionStateTest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 9ae5384..9869f38 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -149,11 +149,7 @@ public class PulsarFunctionE2ETest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
@@ -162,7 +158,7 @@ public class PulsarFunctionE2ETest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
- bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () ->
PortManager.nextFreePort());
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT,
PortManager::nextFreePort);
bkEnsemble.start();
String brokerServiceUrl = "https://127.0.0.1:" +
brokerWebServiceTlsPort;
@@ -492,11 +488,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
}
@@ -726,11 +718,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
}
@@ -869,11 +857,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
}
@@ -1221,11 +1205,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return name.startsWith("function");
- }
- });
+ File[] foundFiles = dir.listFiles((dir1, name) ->
name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over:
" + Arrays.asList(foundFiles));
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 0b78344..b375787 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -124,12 +124,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NarClassLoader extends URLClassLoader {
- private static final FileFilter JAR_FILTER = new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- final String nameToTest = pathname.getName().toLowerCase();
- return nameToTest.endsWith(".jar") && pathname.isFile();
- }
+ private static final FileFilter JAR_FILTER = pathname -> {
+ final String nameToTest = pathname.getName().toLowerCase();
+ return nameToTest.endsWith(".jar") && pathname.isFile();
};
/**
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 177d240..e2aacdd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -239,12 +239,7 @@ public class RateLimiter implements AutoCloseable{
}
protected ScheduledFuture<?> createTask() {
- return executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- renew();
- }
- }, this.rateTime, this.rateTime, this.timeUnit);
+ return executorService.scheduleAtFixedRate(this::renew, this.rateTime,
this.rateTime, this.timeUnit);
}
synchronized void renew() {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index e4cbd47..5a9f49b 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -376,11 +376,7 @@ public class ConcurrentLongHashMapTest {
public void testComputeIfAbsent() {
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16,
1);
AtomicInteger counter = new AtomicInteger();
- LongFunction<Integer> provider = new LongFunction<Integer>() {
- public Integer apply(long key) {
- return counter.getAndIncrement();
- }
- };
+ LongFunction<Integer> provider = key -> counter.getAndIncrement();
assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
assertEquals(map.get(0).intValue(), 0);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 716fded..daa51b7 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -77,14 +77,11 @@ public abstract class ComponentStatsManager implements
AutoCloseable {
this.collectorRegistry = collectorRegistry;
this.metricsLabels = metricsLabels;
- scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- try {
- reset();
- } catch (Exception e) {
- log.error("Failed to reset metrics for 1min window", e);
- }
+ scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ reset();
+ } catch (Exception e) {
+ log.error("Failed to reset metrics for 1min window", e);
}
}, 1, 1, TimeUnit.MINUTES);
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 8ababe4..bfe545a 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -88,12 +88,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(),
new String[0],
FunctionDetails.ComponentType.FUNCTION, null);
- context.setCurrentMessageContext(new Record<String>() {
- @Override
- public String getValue() {
- return null;
- }
- });
+ context.setCurrentMessageContext((Record<String>) () -> null);
}
@Test(expectedExceptions = IllegalStateException.class)
diff --git
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index c1bcb3d..65bcc86 100644
---
a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
+++
b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -53,13 +53,7 @@ public abstract class CanalAbstractSource<V> extends
PushSource<V> {
private static final String DESTINATION = "destination";
- protected final Thread.UncaughtExceptionHandler handler = new
Thread.UncaughtExceptionHandler() {
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.error("[{}] parse events has an error", t.getName(), e);
- }
- };
+ protected final Thread.UncaughtExceptionHandler handler = (t, e) ->
log.error("[{}] parse events has an error", t.getName(), e);
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
@@ -82,14 +76,7 @@ public abstract class CanalAbstractSource<V> extends
PushSource<V> {
protected void start() {
Objects.requireNonNull(connector, "connector is null");
- thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- process();
- }
- });
-
+ thread = new Thread(this::process);
thread.setName("canal source thread");
thread.setUncaughtExceptionHandler(handler);
running = true;
diff --git
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
index b80eed3..775225c 100644
---
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
+++
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
@@ -72,12 +72,7 @@ public class PollingZooKeeperConfigurationProvider extends
try {
agentNodeCache = new NodeCache(client, basePath + "/" +
getAgentName());
agentNodeCache.start();
- agentNodeCache.getListenable().addListener(new
NodeCacheListener() {
- @Override
- public void nodeChanged() throws Exception {
- refreshConfiguration();
- }
- });
+ agentNodeCache.getListenable().addListener(() ->
refreshConfiguration());
} catch (Exception e) {
client.close();
throw e;
diff --git
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
index 8c80512..fcc4c78 100644
---
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
+++
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
@@ -46,13 +46,7 @@ public abstract class AbstractSource<V> extends
PushSource<V> {
protected volatile boolean running = false;
- protected final Thread.UncaughtExceptionHandler handler = new
Thread.UncaughtExceptionHandler() {
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.error("[{}] parse events has an error", t.getName(), e);
- }
- };
+ protected final Thread.UncaughtExceptionHandler handler = (t, e) ->
log.error("[{}] parse events has an error", t.getName(), e);
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
@@ -69,14 +63,7 @@ public abstract class AbstractSource<V> extends
PushSource<V> {
public abstract V extractValue(String message);
protected void start() {
- thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- process();
- }
- });
-
+ thread = new Thread(this::process);
thread.setName("flume source thread");
thread.setUncaughtExceptionHandler(handler);
running = true;
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
index 163fe22..3f57dc4 100644
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
@@ -163,12 +163,7 @@ public abstract class AbstractHdfsConnector {
protected FileSystem getFileSystemAsUser(final Configuration config,
UserGroupInformation ugi) throws IOException {
try {
- return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- return FileSystem.get(config);
- }
- });
+ return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () ->
FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " +
e.getMessage());
}
diff --git
a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
index aa22b42..f939654 100644
---
a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
+++
b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
@@ -163,12 +163,7 @@ public abstract class AbstractHdfsConnector {
protected FileSystem getFileSystemAsUser(final Configuration config,
UserGroupInformation ugi) throws IOException {
try {
- return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws Exception {
- return FileSystem.get(config);
- }
- });
+ return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () ->
FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " +
e.getMessage());
}
diff --git
a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 583c474..ce4aae4 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -124,10 +124,8 @@ public class JdbcSinkTest {
Record<GenericRecord> insertRecord =
PulsarRecord.<GenericRecord>builder()
.message(insertMessage)
- .topicName("fake_topic_name").ackFunction(new Runnable(){
- public void run(){
- }
- })
+ .topicName("fake_topic_name")
+ .ackFunction(() -> {})
.build();
genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
@@ -173,10 +171,8 @@ public class JdbcSinkTest {
Message<GenericRecord> updateMessage = mock(MessageImpl.class);
Record<GenericRecord> updateRecord =
PulsarRecord.<GenericRecord>builder()
.message(updateMessage)
- .topicName("fake_topic_name").ackFunction(new Runnable(){
- public void run(){
- }
- })
+ .topicName("fake_topic_name")
+ .ackFunction(() -> {})
.build();
GenericSchema<GenericRecord> updateGenericAvroSchema;
@@ -216,10 +212,8 @@ public class JdbcSinkTest {
Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
Record<GenericRecord> deleteRecord =
PulsarRecord.<GenericRecord>builder()
.message(deleteMessage)
- .topicName("fake_topic_name").ackFunction(new Runnable(){
- public void run(){
- }
- })
+ .topicName("fake_topic_name")
+ .ackFunction(() -> {})
.build();
GenericSchema<GenericRecord> deleteGenericAvroSchema = new
GenericAvroSchema(schema.getSchemaInfo());
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 19cfe5f..cce44b2 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -186,33 +186,27 @@ public class PulsarMetadata implements ConnectorMetadata {
ImmutableMap.Builder<String, ColumnHandle> columnHandles =
ImmutableMap.builder();
- tableMetaData.getColumns().forEach(new Consumer<ColumnMetadata>() {
- @Override
- public void accept(ColumnMetadata columnMetadata) {
-
- PulsarColumnMetadata pulsarColumnMetadata =
(PulsarColumnMetadata) columnMetadata;
-
- PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
- connectorId,
- pulsarColumnMetadata.getNameWithCase(),
- pulsarColumnMetadata.getType(),
- pulsarColumnMetadata.isHidden(),
- pulsarColumnMetadata.isInternal(),
- pulsarColumnMetadata.getFieldNames(),
- pulsarColumnMetadata.getPositionIndices());
-
- columnHandles.put(
- columnMetadata.getName(),
- pulsarColumnHandle);
- }
+ tableMetaData.getColumns().forEach(columnMetadata -> {
+
+ PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata)
columnMetadata;
+
+ PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
+ connectorId,
+ pulsarColumnMetadata.getNameWithCase(),
+ pulsarColumnMetadata.getType(),
+ pulsarColumnMetadata.isHidden(),
+ pulsarColumnMetadata.isInternal(),
+ pulsarColumnMetadata.getFieldNames(),
+ pulsarColumnMetadata.getPositionIndices());
+
+ columnHandles.put(
+ columnMetadata.getName(),
+ pulsarColumnHandle);
});
- PulsarInternalColumn.getInternalFields().stream().forEach(new
Consumer<PulsarInternalColumn>() {
- @Override
- public void accept(PulsarInternalColumn pulsarInternalColumn) {
- PulsarColumnHandle pulsarColumnHandle =
pulsarInternalColumn.getColumnHandle(connectorId, false);
- columnHandles.put(pulsarColumnHandle.getName(),
pulsarColumnHandle);
- }
+ PulsarInternalColumn.getInternalFields().forEach(pulsarInternalColumn
-> {
+ PulsarColumnHandle pulsarColumnHandle =
pulsarInternalColumn.getColumnHandle(connectorId, false);
+ columnHandles.put(pulsarColumnHandle.getName(),
pulsarColumnHandle);
});
return columnHandles.build();
@@ -314,12 +308,8 @@ public class PulsarMetadata implements ConnectorMetadata {
builder.addAll(getColumns(null, schema, new HashSet<>(), new
Stack<>(), new Stack<>()));
if (withInternalColumns) {
- PulsarInternalColumn.getInternalFields().stream().forEach(new
Consumer<PulsarInternalColumn>() {
- @Override
- public void accept(PulsarInternalColumn pulsarInternalColumn) {
- builder.add(pulsarInternalColumn.getColumnMetadata(false));
- }
- });
+ PulsarInternalColumn.getInternalFields().forEach(
+ pulsarInternalColumn ->
builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return new ConnectorTableMetadata(schemaTableName, builder.build());
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 800fc80..5c569fd 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -292,12 +292,10 @@ public class PerformanceConsumer {
long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- printAggregatedThroughput(start);
- printAggregatedStats();
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ printAggregatedThroughput(start);
+ printAggregatedStats();
+ }));
long oldTime = System.nanoTime();
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 922f498..1430e57 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -336,12 +336,10 @@ public class PerformanceProducer {
long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- printAggregatedThroughput(start);
- printAggregatedStats();
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ printAggregatedThroughput(start);
+ printAggregatedStats();
+ }));
Collections.shuffle(producers);
AtomicBoolean isDone = new AtomicBoolean();
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
index 33bd11c..e6f693c 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/broker/zookeeper/aspectj/ClientCnxnAspect.java
@@ -74,12 +74,7 @@ public class ClientCnxnAspect {
// zkResponse event shouldn't be blocked and it should be processed
// async
if (eventProcessExecutor != null &&
!eventProcessExecutor.isShutdown()) {
- eventProcessExecutor.submit(new Runnable() {
- @Override
- public void run() {
- processEvent(joinPoint);
- }
- });
+ eventProcessExecutor.submit(() -> processEvent(joinPoint));
}
}
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 7325e6c..6136481 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -204,12 +204,7 @@ public abstract class ZooKeeperCache implements Watcher {
private boolean exists(final String path, Watcher watcher) throws
KeeperException, InterruptedException {
try {
- return existsCache.get(path, new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return zkSession.get().exists(path, watcher) != null;
- }
- });
+ return existsCache.get(path, () -> zkSession.get().exists(path,
watcher) != null);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof KeeperException) {
@@ -386,12 +381,9 @@ public abstract class ZooKeeperCache implements Watcher {
public Set<String> getChildren(final String path, final Watcher watcher)
throws KeeperException, InterruptedException {
try {
- return childrenCache.get(path, new Callable<Set<String>>() {
- @Override
- public Set<String> call() throws Exception {
- LOG.debug("Fetching children at {}", path);
- return
Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
- }
+ return childrenCache.get(path, () -> {
+ LOG.debug("Fetching children at {}", path);
+ return
Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
});
} catch (ExecutionException e) {
Throwable cause = e.getCause();
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
index db7755b..8103143 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/AdminMultiHostTest.java
@@ -80,15 +80,12 @@ public class AdminMultiHostTest {
// Because zookeeper session timeout is 30ms and ticktime is 2ms, so we
need wait more than 32ms
private void waitBrokerDown(PulsarAdmin admin, int expectBrokers, int
timeout)
throws InterruptedException, ExecutionException, TimeoutException {
- FutureTask<Boolean> futureTask = new FutureTask<>(new
Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- while (admin.brokers().getActiveBrokers(clusterName).size() !=
expectBrokers) {
- admin.brokers().healthcheck();
- TimeUnit.MILLISECONDS.sleep(1000);
- }
- return true;
+ FutureTask<Boolean> futureTask = new FutureTask<>(() -> {
+ while (admin.brokers().getActiveBrokers(clusterName).size() !=
expectBrokers) {
+ admin.brokers().healthcheck();
+ TimeUnit.MILLISECONDS.sleep(1000);
}
+ return true;
});
new Thread(futureTask).start();
futureTask.get(timeout, TimeUnit.SECONDS);
diff --git
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index 6f2ac5e..7504948 100644
---
a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++
b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -44,7 +44,7 @@ public class SparkStreamingPulsarReceiverTest extends
PulsarTestSuite {
@Test(dataProvider = "ServiceUrls")
public void testReceivedMessage(String serviceUrl) throws Exception {
- ConsumerConfigurationData<byte[]> consConf = new
ConsumerConfigurationData();
+ ConsumerConfigurationData<byte[]> consConf = new
ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
set.add(TOPIC);
@@ -80,8 +80,8 @@ public class SparkStreamingPulsarReceiverTest extends
PulsarTestSuite {
}
@Test(dataProvider = "ServiceUrls")
- public void testDefaultSettingsOfReceiver(String serviceUrl) throws
Exception {
- ConsumerConfigurationData<byte[]> consConf = new
ConsumerConfigurationData();
+ public void testDefaultSettingsOfReceiver(String serviceUrl) {
+ ConsumerConfigurationData<byte[]> consConf = new
ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
set.add(TOPIC);