[ 
https://issues.apache.org/jira/browse/FLINK-4177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516321#comment-15516321
 ] 

ASF GitHub Bot commented on FLINK-4177:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r80235122
  
    --- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws 
IOException {
                }
                scanner.close();
     
    -
    -           // Tell cassandra where the configuration files are.
    -           // Use the test configuration file.
    -           System.setProperty("cassandra.config", 
tmp.getAbsoluteFile().toURI().toString());
    -
                if (EMBEDDED) {
    -                   cassandra = new EmbeddedCassandraService();
    -                   cassandra.start();
    +                   String javaCommand = getJavaCommandPath();
    +
    +                   // create a logging file for the process
    +                   File tempLogFile = File.createTempFile("testlogconfig", 
"properties");
    +                   CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +                   // start the task manager process
    +                   String[] command = new String[]{
    +                           javaCommand,
    +                           "-Dlog.level=DEBUG",
    +                           "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
    +                           "-Dcassandra.config=" + tmp.toURI(),
    +                           // these options were taken directly from the 
jvm.options file in the cassandra repo
    +                           "-XX:+UseThreadPriorities",
    +                           "-Xss256k",
    +                           "-XX:+AlwaysPreTouch",
    +                           "-XX:+UseTLAB",
    +                           "-XX:+ResizeTLAB",
    +                           "-XX:+UseNUMA",
    +                           "-XX:+PerfDisableSharedMem",
    +                           "-XX:+UseParNewGC",
    +                           "-XX:+UseConcMarkSweepGC",
    +                           "-XX:+CMSParallelRemarkEnabled",
    +                           "-XX:SurvivorRatio=8",
    +                           "-XX:MaxTenuringThreshold=1",
    +                           "-XX:CMSInitiatingOccupancyFraction=75",
    +                           "-XX:+UseCMSInitiatingOccupancyOnly",
    +                           "-XX:CMSWaitDuration=10000",
    +                           "-XX:+CMSParallelInitialMarkEnabled",
    +                           "-XX:+CMSEdenChunksRecordAlways",
    +                           "-XX:+CMSClassUnloadingEnabled",
    +
    +                           "-classpath", getCurrentClasspath(),
    +                           EmbeddedCassandraService.class.getName()
    +                   };
    +
    +                   ProcessBuilder bld = new ProcessBuilder(command);
    +                   cassandra = bld.start();
    +                   sw = new StringWriter();
    +                   new PipeForwarder(cassandra.getErrorStream(), sw);
                }
     
    -           try {
    -                   Thread.sleep(1000 * 10);
    -           } catch (InterruptedException e) { //give cassandra a few 
seconds to start up
    +           int attempt = 0;
    +           while (true) {
    +                   try {
    +                           attempt++;
    +                           cluster = builder.getCluster();
    +                           session = cluster.connect();
    +                           break;
    +                   } catch (Exception e) {
    +                           if (attempt > 30) {
    +                                   throw e;
    +                           }
    +                           try {
    +                                   Thread.sleep(1000);
    +                           } catch (InterruptedException e1) {
    +                           }
    +                   }
                }
    -
    -           cluster = builder.getCluster();
    -           session = cluster.connect();
    +           LOG.debug("Connection established after " + attempt + " 
attempts.");
     
                session.execute(CREATE_KEYSPACE_QUERY);
    -           session.execute(CREATE_TABLE_QUERY);
    -   }
    -
    -   @BeforeClass
    -   public static void startFlink() throws Exception {
    -           Configuration config = new Configuration();
    -           config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
    +           session.execute(CREATE_TABLE_QUERY.replace("$TABLE", 
"flink_initial"));
     
    -           flinkCluster = new ForkableFlinkMiniCluster(config);
    -           flinkCluster.start();
    +           try {
    +                   Thread.sleep(5000);
    --- End diff --
    
    Can this not be modeled via attempting trying to execute operations on the 
table? Or is the remaining work not guaranteed to succeed, even if the first 
table operations succeed?


> CassandraConnectorTest.testCassandraCommitter causing unstable builds
> ---------------------------------------------------------------------
>
>                 Key: FLINK-4177
>                 URL: https://issues.apache.org/jira/browse/FLINK-4177
>             Project: Flink
>          Issue Type: Bug
>          Components: Cassandra Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Robert Metzger
>              Labels: test-stability
>
> This build: https://api.travis-ci.org/jobs/143272982/log.txt?deansi=true 
> failed with
> {code}
> 07/08/2016 09:59:12   Job execution switched to status FINISHED.
> Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 146.646 sec 
> <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
> testCassandraCommitter(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest)
>   Time elapsed: 9.057 sec  <<< ERROR!
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
> during write query at consistency LOCAL_SERIAL (1 replica were required but 
> only 0 acknowledged the write)
>       at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73)
>       at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:26)
>       at 
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
>       at 
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
>       at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
>       at 
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
>       at 
> org.apache.flink.streaming.connectors.cassandra.CassandraCommitter.open(CassandraCommitter.java:103)
>       at 
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest.testCassandraCommitter(CassandraConnectorTest.java:284)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: 
> Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica 
> were required but only 0 acknowledged the write)
>       at 
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100)
>       at 
> com.datastax.driver.core.Responses$Error.asException(Responses.java:122)
>       at 
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
>       at 
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
>       at 
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
>       at 
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
>       at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: 
> Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica 
> were required but only 0 acknowledged the write)
>       at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59)
>       at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
>       at 
> com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
>       at 
> com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
>       at 
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
>       at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to