[ https://issues.apache.org/jira/browse/FLINK-4177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516321#comment-15516321 ]
ASF GitHub Bot commented on FLINK-4177: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2484#discussion_r80235122 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException { } scanner.close(); - - // Tell cassandra where the configuration files are. - // Use the test configuration file. - System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); - if (EMBEDDED) { - cassandra = new EmbeddedCassandraService(); - cassandra.start(); + String javaCommand = getJavaCommandPath(); + + // create a logging file for the process + File tempLogFile = File.createTempFile("testlogconfig", "properties"); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + // start the task manager process + String[] command = new String[]{ + javaCommand, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), + "-Dcassandra.config=" + tmp.toURI(), + // these options were taken directly from the jvm.options file in the cassandra repo + "-XX:+UseThreadPriorities", + "-Xss256k", + "-XX:+AlwaysPreTouch", + "-XX:+UseTLAB", + "-XX:+ResizeTLAB", + "-XX:+UseNUMA", + "-XX:+PerfDisableSharedMem", + "-XX:+UseParNewGC", + "-XX:+UseConcMarkSweepGC", + "-XX:+CMSParallelRemarkEnabled", + "-XX:SurvivorRatio=8", + "-XX:MaxTenuringThreshold=1", + "-XX:CMSInitiatingOccupancyFraction=75", + "-XX:+UseCMSInitiatingOccupancyOnly", + "-XX:CMSWaitDuration=10000", + "-XX:+CMSParallelInitialMarkEnabled", + "-XX:+CMSEdenChunksRecordAlways", + "-XX:+CMSClassUnloadingEnabled", + + "-classpath", getCurrentClasspath(), + EmbeddedCassandraService.class.getName() + }; + + ProcessBuilder bld = new ProcessBuilder(command); + cassandra = bld.start(); + sw = new StringWriter(); + new PipeForwarder(cassandra.getErrorStream(), sw); } - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + int attempt = 0; + while (true) { + try { + attempt++; + cluster = builder.getCluster(); + session = cluster.connect(); + break; + } catch (Exception e) { + if (attempt > 30) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + } + } } - - cluster = builder.getCluster(); - session = cluster.connect(); + LOG.debug("Connection established after " + attempt + " attempts."); session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY); - } - - @BeforeClass - public static void startFlink() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial")); - flinkCluster = new ForkableFlinkMiniCluster(config); - flinkCluster.start(); + try { + Thread.sleep(5000); --- End diff -- Can this not be modeled via attempting trying to execute operations on the table? Or is the remaining work not guaranteed to succeed, even if the first table operations succeed? > CassandraConnectorTest.testCassandraCommitter causing unstable builds > --------------------------------------------------------------------- > > Key: FLINK-4177 > URL: https://issues.apache.org/jira/browse/FLINK-4177 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Robert Metzger > Labels: test-stability > > This build: https://api.travis-ci.org/jobs/143272982/log.txt?deansi=true > failed with > {code} > 07/08/2016 09:59:12 Job execution switched to status FINISHED. > Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 146.646 sec > <<< FAILURE! - in > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest > testCassandraCommitter(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest) > Time elapsed: 9.057 sec <<< ERROR! > com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout > during write query at consistency LOCAL_SERIAL (1 replica were required but > only 0 acknowledged the write) > at > com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73) > at > com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:26) > at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) > at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) > at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63) > at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > at > org.apache.flink.streaming.connectors.cassandra.CassandraCommitter.open(CassandraCommitter.java:103) > at > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest.testCassandraCommitter(CassandraConnectorTest.java:284) > Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: > Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica > were required but only 0 acknowledged the write) > at > com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100) > at > com.datastax.driver.core.Responses$Error.asException(Responses.java:122) > at > com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477) > at > com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) > at > com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) > at > io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618) > at > io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329) > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > at java.lang.Thread.run(Thread.java:745) > Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: > Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica > were required but only 0 acknowledged the write) > at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59) > at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37) > at > com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266) > at > com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) > at > io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618) > at > io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329) > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)