[
https://issues.apache.org/jira/browse/FLINK-34092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
baihailiang updated FLINK-34092:
--------------------------------
Attachment: image-2024-01-15-20-36-25-511.png
Description:
run flink-test[1] on aarch64 machine in the flink root dir using command {{mvn
test -pl flink-tests -am
-Dtest=org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll
-DfailIfNoTests=false - Dcheckstyle.skip -dtestNettyEpoll -DfailIfNoTests=false
-Dcheckstyle.skip Dcheckstyle.skip}}
{code:java}
ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.071 s
<<< FAILURE! - in org.apache.flink.test.runtime.NettyEpollITCase
[ERROR] org.apache.flink.test.runtime.NettyEpollITCase.testNettyEpoll Time
elapsed: 2.048 s <<< ERROR!
java.lang.UnsatisfiedLinkError: failed to load the required native library
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.<clinit>(EpollEventLoop.java:51)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:185)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:36)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:60)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:49)
at
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:113)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:100)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:77)
at
org.apache.flink.runtime.io.network.netty.NettyClient.initEpollBootstrap(NettyClient.java:164)
at
org.apache.flink.runtime.io.network.netty.NettyClient.init(NettyClient.java:79)
at
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.start(NettyConnectionManager.java:87)
at
org.apache.flink.runtime.io.network.NettyShuffleEnvironment.start(NettyShuffleEnvironment.java:357)
at
org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:293)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:626)
at
org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:753)
at
org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:734)
at
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:452)
at
org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:234)
at
org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:109)
at
org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64)
at
org.apache.flink.test.runtime.NettyEpollITCase.trySetUpCluster(NettyEpollITCase.java:83)
at
org.apache.flink.test.runtime.NettyEpollITCase.testNettyEpoll(NettyEpollITCase.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
at
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
Caused by: java.lang.UnsatisfiedLinkError: could not load a native library:
org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:223)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:306)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.<clinit>(Native.java:85)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:40)
... 67 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native
library: org_apache_flink_shaded_netty4_netty_transport_native_epoll
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:223)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:309)
... 69 more
Caused by: java.io.FileNotFoundException:
META-INF/native/liborg_apache_flink_shaded_netty4_netty_transport_native_epoll.so
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:170)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:301)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:335)
at java.security.AccessController.doPrivileged(Native Method)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:327)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:293)
... 71 more
Caused by: java.io.FileNotFoundException:
META-INF/native/liborg_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64.so
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:170)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64 in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:301)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64 in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:335)
at java.security.AccessController.doPrivileged(Native Method)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:327)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:293)
... 71 more {code}
In addition to this, I've also tested myself with demos that show that netty's
epoll doesn't work on arrch64 machines
{code:java}
import io.netty.channel.epoll.Epoll;
public class test {
public static void main(String[] args) {
System.out.println("Epoll is available:" + Epoll.isAvailable());
}
}
/* <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.104.Final</version>
</dependency>
*/
//netty version is 4.1.104.Final{code}
!image-2024-01-15-20-36-25-511.png!
So I found that the reason the
org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll test method
doesn't pass is that version 4.1.70 of netty doesn't work with epoll on arrch64
h3. solution
I think there are two solutions right now, the first one is to update the
version of netty, but I found that in that demo example above where I
introduced the latest version of netty, the result of Epoll.isAvailable() on
arsch64 is also false.
The second option is to modify
org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll test method, add
a new assertion for determining whether netty's epoll is available or not, if
it is available then run the test, if it is unavailable then skip the test
method, so that even if in the If it is not available, the test method will be
skipped, so that even if the netty version is upgraded later, the test method
will not be affected.
If this is an issue, I hope that it would be helpful to improve Flink and if I
have a chance, I want to fix it!
{code:java}
public void testNettyEpoll() throws Exception {
// my add code
Assume.assumeTrue(Epoll.isAvailable());
MiniClusterWithClientResource cluster = trySetUpCluster();
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_TASK_MANAGERS);
DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42);
input.keyBy(
new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws
Exception {
return value;
}
})
.sum(0)
.print();
env.execute();
} finally {
cluster.after();
}
} {code}
was:
run flink-test[1] on aarch64 machine in the flink root dir using command {{mvn
test -pl flink-tests -am
-Dtest=org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll
-DfailIfNoTests=false - Dcheckstyle.skip -dtestNettyEpoll -DfailIfNoTests=false
-Dcheckstyle.skip Dcheckstyle.skip}}
{code:java}
ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.071 s
<<< FAILURE! - in org.apache.flink.test.runtime.NettyEpollITCase
[ERROR] org.apache.flink.test.runtime.NettyEpollITCase.testNettyEpoll Time
elapsed: 2.048 s <<< ERROR!
java.lang.UnsatisfiedLinkError: failed to load the required native library
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.<clinit>(EpollEventLoop.java:51)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:185)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:36)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:60)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:49)
at
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:113)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:100)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:77)
at
org.apache.flink.runtime.io.network.netty.NettyClient.initEpollBootstrap(NettyClient.java:164)
at
org.apache.flink.runtime.io.network.netty.NettyClient.init(NettyClient.java:79)
at
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.start(NettyConnectionManager.java:87)
at
org.apache.flink.runtime.io.network.NettyShuffleEnvironment.start(NettyShuffleEnvironment.java:357)
at
org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:293)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:626)
at
org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:753)
at
org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:734)
at
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:452)
at
org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:234)
at
org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:109)
at
org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64)
at
org.apache.flink.test.runtime.NettyEpollITCase.trySetUpCluster(NettyEpollITCase.java:83)
at
org.apache.flink.test.runtime.NettyEpollITCase.testNettyEpoll(NettyEpollITCase.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
at
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
Caused by: java.lang.UnsatisfiedLinkError: could not load a native library:
org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:223)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:306)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.<clinit>(Native.java:85)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:40)
... 67 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native
library: org_apache_flink_shaded_netty4_netty_transport_native_epoll
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:223)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:309)
... 69 more
Caused by: java.io.FileNotFoundException:
META-INF/native/liborg_apache_flink_shaded_netty4_netty_transport_native_epoll.so
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:170)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:301)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:335)
at java.security.AccessController.doPrivileged(Native Method)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:327)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:293)
... 71 more
Caused by: java.io.FileNotFoundException:
META-INF/native/liborg_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64.so
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:170)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64 in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:301)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
... 70 more
Suppressed: java.lang.UnsatisfiedLinkError: no
org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64 in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:843)
at java.lang.System.loadLibrary(System.java:1136)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:335)
at java.security.AccessController.doPrivileged(Native Method)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:327)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:293)
... 71 more {code}
In addition to this, I also tested it myself using the demo, which showed that
epoll is not available in the arrch64 netty
{code:java}
import io.netty.channel.epoll.Epoll;
public class test {
public static void main(String[] args) {
System.out.println("Epoll is available:" + Epoll.isAvailable());
}
}
/* <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.104.Final</version>
</dependency>
*/
//netty version is 4.1.104.Final{code}
Environment:
flink 1.16.2
arrch64
> NettyEpollITCase#testNettyEpoll on arrch64 running fail
> -------------------------------------------------------
>
> Key: FLINK-34092
> URL: https://issues.apache.org/jira/browse/FLINK-34092
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 1.16.2
> Environment: flink 1.16.2
> arrch64
> Reporter: baihailiang
> Priority: Major
> Attachments: image-2024-01-15-20-36-25-511.png
>
>
> run flink-test[1] on aarch64 machine in the flink root dir using command
> {{mvn test -pl flink-tests -am
> -Dtest=org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll
> -DfailIfNoTests=false - Dcheckstyle.skip -dtestNettyEpoll
> -DfailIfNoTests=false -Dcheckstyle.skip Dcheckstyle.skip}}
> {code:java}
> ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.071
> s <<< FAILURE! - in org.apache.flink.test.runtime.NettyEpollITCase
> [ERROR] org.apache.flink.test.runtime.NettyEpollITCase.testNettyEpoll Time
> elapsed: 2.048 s <<< ERROR!
> java.lang.UnsatisfiedLinkError: failed to load the required native library
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.<clinit>(EpollEventLoop.java:51)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:185)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:36)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:60)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:49)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:113)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:100)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:77)
> at
> org.apache.flink.runtime.io.network.netty.NettyClient.initEpollBootstrap(NettyClient.java:164)
> at
> org.apache.flink.runtime.io.network.netty.NettyClient.init(NettyClient.java:79)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.start(NettyConnectionManager.java:87)
> at
> org.apache.flink.runtime.io.network.NettyShuffleEnvironment.start(NettyShuffleEnvironment.java:357)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:293)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:626)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:753)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:734)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:452)
> at
> org.apache.flink.runtime.testutils.MiniClusterResource.startMiniCluster(MiniClusterResource.java:234)
> at
> org.apache.flink.runtime.testutils.MiniClusterResource.before(MiniClusterResource.java:109)
> at
> org.apache.flink.test.util.MiniClusterWithClientResource.before(MiniClusterWithClientResource.java:64)
> at
> org.apache.flink.test.runtime.NettyEpollITCase.trySetUpCluster(NettyEpollITCase.java:83)
> at
> org.apache.flink.test.runtime.NettyEpollITCase.testNettyEpoll(NettyEpollITCase.java:50)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
> at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
> at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
> at
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
> at
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
> at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
> at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
> at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
> at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
> at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> Caused by: java.lang.UnsatisfiedLinkError: could not load a native library:
> org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:223)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:306)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.<clinit>(Native.java:85)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:40)
> ... 67 more
> Suppressed: java.lang.UnsatisfiedLinkError: could not load a native
> library: org_apache_flink_shaded_netty4_netty_transport_native_epoll
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:223)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:309)
> ... 69 more
> Caused by: java.io.FileNotFoundException:
> META-INF/native/liborg_apache_flink_shaded_netty4_netty_transport_native_epoll.so
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:170)
> ... 70 more
> Suppressed: java.lang.UnsatisfiedLinkError: no
> org_apache_flink_shaded_netty4_netty_transport_native_epoll in
> java.library.path
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
> at java.lang.Runtime.loadLibrary0(Runtime.java:843)
> at java.lang.System.loadLibrary(System.java:1136)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:301)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
> ... 70 more
> Suppressed: java.lang.UnsatisfiedLinkError: no
> org_apache_flink_shaded_netty4_netty_transport_native_epoll in
> java.library.path
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
> at java.lang.Runtime.loadLibrary0(Runtime.java:843)
> at java.lang.System.loadLibrary(System.java:1136)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:335)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:293)
> ... 71 more
> Caused by: java.io.FileNotFoundException:
> META-INF/native/liborg_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64.so
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:170)
> ... 70 more
> Suppressed: java.lang.UnsatisfiedLinkError: no
> org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64 in
> java.library.path
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
> at java.lang.Runtime.loadLibrary0(Runtime.java:843)
> at java.lang.System.loadLibrary(System.java:1136)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:301)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
> ... 70 more
> Suppressed: java.lang.UnsatisfiedLinkError: no
> org_apache_flink_shaded_netty4_netty_transport_native_epoll_aarch_64 in
> java.library.path
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
> at java.lang.Runtime.loadLibrary0(Runtime.java:843)
> at java.lang.System.loadLibrary(System.java:1136)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:335)
> at java.security.AccessController.doPrivileged(Native Method)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:293)
> ... 71 more {code}
> In addition to this, I've also tested myself with demos that show that
> netty's epoll doesn't work on arrch64 machines
> {code:java}
> import io.netty.channel.epoll.Epoll;
> public class test {
> public static void main(String[] args) {
> System.out.println("Epoll is available:" + Epoll.isAvailable());
> }
> }
> /* <dependency>
> <groupId>io.netty</groupId>
> <artifactId>netty-all</artifactId>
> <version>4.1.104.Final</version>
> </dependency>
> */
> //netty version is 4.1.104.Final{code}
> !image-2024-01-15-20-36-25-511.png!
> So I found that the reason the
> org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll test method
> doesn't pass is that version 4.1.70 of netty doesn't work with epoll on
> arrch64
> h3. solution
> I think there are two solutions right now, the first one is to update the
> version of netty, but I found that in that demo example above where I
> introduced the latest version of netty, the result of Epoll.isAvailable() on
> arsch64 is also false.
> The second option is to modify
> org.apache.flink.test.runtime.NettyEpollITCase#testNettyEpoll test method,
> add a new assertion for determining whether netty's epoll is available or
> not, if it is available then run the test, if it is unavailable then skip the
> test method, so that even if in the If it is not available, the test method
> will be skipped, so that even if the netty version is upgraded later, the
> test method will not be affected.
>
> If this is an issue, I hope that it would be helpful to improve Flink and if
> I have a chance, I want to fix it!
>
> {code:java}
> public void testNettyEpoll() throws Exception {
> // my add code
> Assume.assumeTrue(Epoll.isAvailable());
> MiniClusterWithClientResource cluster = trySetUpCluster();
> try {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(NUM_TASK_MANAGERS);
> DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42);
> input.keyBy(
> new KeySelector<Integer, Integer>() {
> @Override
> public Integer getKey(Integer value) throws
> Exception {
> return value;
> }
> })
> .sum(0)
> .print();
> env.execute();
> } finally {
> cluster.after();
> }
> } {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)