[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365130#comment-16365130
 ] 

ASF GitHub Bot commented on FLINK-6352:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5282#discussion_r168377242
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -1910,86 +1959,171 @@ public void cancel() {
     
                        
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
     
    -                   final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -                   
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    -                   readEnv.getConfig().disableSysoutLogging();
    -                   readEnv.setParallelism(parallelism);
    +                   if (validateSequence(topicName, parallelism, 
deserSchema, numElements)) {
    +                           // everything is good!
    +                           return topicName;
    +                   }
    +                   else {
    +                           deleteTestTopic(topicName);
    +                           // fall through the loop
    +                   }
    +           }
     
    -                   Properties readProps = (Properties) 
standardProps.clone();
    -                   readProps.setProperty("group.id", 
"flink-tests-validator");
    -                   readProps.putAll(secureProps);
    -                   FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> 
consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
    +           throw new Exception("Could not write a valid sequence to Kafka 
after " + maxNumAttempts + " attempts");
    +   }
     
    -                   readEnv
    -                                   .addSource(consumer)
    -                                   .map(new 
RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
    +   protected void writeAppendSequence(
    +                   String topicName,
    +                   final int originalNumElements,
    +                   final int numElementsToAppend,
    +                   final int parallelism) throws Exception {
     
    -                                           private final int totalCount = 
parallelism * numElements;
    -                                           private int count = 0;
    +           LOG.info("\n===================================\n" +
    +                   "== Appending sequence of " + numElementsToAppend + " 
into " + topicName +
    +                   "===================================");
     
    -                                           @Override
    -                                           public Tuple2<Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws Exception {
    -                                                   if (++count == 
totalCount) {
    -                                                           throw new 
SuccessException();
    -                                                   } else {
    -                                                           return value;
    -                                                   }
    -                                           }
    -                                   }).setParallelism(1)
    -                                   .addSink(new 
DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
    +           final TypeInformation<Tuple2<Integer, Integer>> resultType =
    +                   TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>() {});
     
    -                   final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
    +           final KeyedSerializationSchema<Tuple2<Integer, Integer>> 
serSchema =
    +                   new KeyedSerializationSchemaWrapper<>(
    +                           new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
     
    -                   Thread runner = new Thread() {
    -                           @Override
    -                           public void run() {
    -                                   try {
    -                                           tryExecute(readEnv, "sequence 
validation");
    -                                   } catch (Throwable t) {
    -                                           errorRef.set(t);
    -                                   }
    +           final KeyedDeserializationSchema<Tuple2<Integer, Integer>> 
deserSchema =
    +                   new KeyedDeserializationSchemaWrapper<>(
    +                           new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
    +
    +           // -------- Write the append sequence --------
    +
    +           StreamExecutionEnvironment writeEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           
writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +           writeEnv.getConfig().disableSysoutLogging();
    +
    +           DataStream<Tuple2<Integer, Integer>> stream = 
writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
    +
    +                   private boolean running = true;
    +
    +                   @Override
    +                   public void run(SourceContext<Tuple2<Integer, Integer>> 
ctx) throws Exception {
    +                           int cnt = originalNumElements;
    +                           int partition = 
getRuntimeContext().getIndexOfThisSubtask();
    +
    +                           while (running && cnt < numElementsToAppend + 
originalNumElements) {
    +                                   ctx.collect(new Tuple2<>(partition, 
cnt));
    +                                   cnt++;
                                }
    -                   };
    -                   runner.start();
    +                   }
     
    -                   final long deadline = System.nanoTime() + 
10_000_000_000L;
    -                   long delay;
    -                   while (runner.isAlive() && (delay = deadline - 
System.nanoTime()) > 0) {
    -                           runner.join(delay / 1_000_000L);
    +                   @Override
    +                   public void cancel() {
    +                           running = false;
                        }
    +           }).setParallelism(parallelism);
     
    -                   boolean success;
    +           // the producer must not produce duplicates
    +           Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
    +           producerProperties.setProperty("retries", "0");
    +           producerProperties.putAll(secureProps);
     
    -                   if (runner.isAlive()) {
    -                           // did not finish in time, maybe the producer 
dropped one or more records and
    -                           // the validation did not reach the exit point
    -                           success = false;
    -                           
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
    -                   }
    -                   else {
    -                           Throwable error = errorRef.get();
    -                           if (error != null) {
    -                                   success = false;
    -                                   LOG.info("Attempt " + attempt + " 
failed with exception", error);
    +           kafkaServer.produceIntoKafka(stream, topicName, serSchema, 
producerProperties, new Tuple2FlinkPartitioner(parallelism))
    +                   .setParallelism(parallelism);
    +
    +           try {
    +                   writeEnv.execute("Write sequence");
    +           }
    +           catch (Exception e) {
    +                   throw new Exception("Failed to append sequence to 
Kafka; append job failed.", e);
    +           }
    +
    +           LOG.info("Finished writing append sequence");
    +
    +           // we need to validate the sequence, because kafka's producers 
are not exactly once
    +           LOG.info("Validating sequence");
    +           
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
    +
    +           if (!validateSequence(topicName, parallelism, deserSchema, 
originalNumElements + numElementsToAppend)) {
    +                   throw new Exception("Could not append a valid sequence 
to Kafka.");
    +           }
    +   }
    +
    +   private boolean validateSequence(
    +                   final String topic,
    +                   final int parallelism,
    +                   KeyedDeserializationSchema<Tuple2<Integer, Integer>> 
deserSchema,
    +                   final int totalNumElements) throws Exception {
    +
    +           final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +           readEnv.getConfig().disableSysoutLogging();
    +           readEnv.setParallelism(parallelism);
    +
    +           Properties readProps = (Properties) standardProps.clone();
    +           readProps.setProperty("group.id", "flink-tests-validator");
    +           readProps.putAll(secureProps);
    +           FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = 
kafkaServer.getConsumer(topic, deserSchema, readProps);
    +           consumer.setStartFromEarliest();
    +
    +           readEnv
    +                   .addSource(consumer)
    +                   .map(new RichMapFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>() {
    +
    +                           private final int totalCount = parallelism * 
totalNumElements;
    +                           private int count = 0;
    +
    +                           @Override
    +                           public Tuple2<Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws Exception {
    +                                   System.out.println(count);
    --- End diff --
    
    Indeed, will remove.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -------------------------------------------------------------------------
>
>                 Key: FLINK-6352
>                 URL: https://issues.apache.org/jira/browse/FLINK-6352
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Fang Yong
>            Assignee: Fang Yong
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
>     Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
>     So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition    
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to