[ 
https://issues.apache.org/jira/browse/FLINK-23854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise updated FLINK-23854:
--------------------------------
    Description: 
The KafkaSink throws the exception when restarted with a lower parallelism and 
the exactly-once guarantee. The exception is like this.


{noformat}
java.lang.IllegalStateException: Internal error: It is expected that state from 
previous executions is distributed to the same subtask id.   
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)   
at 
org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178)
   
at 
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130)  
 
at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99) 
  
at 
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134)
   
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
   
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
   
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638)
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)   
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)   
at java.lang.Thread.run(Thread.java:748)    
Suppressed: java.lang.NullPointerException       
at 
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
       
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797)
        ... 4 more
{noformat}

I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own 
mac. I change the parallelism from 4 to 2 and restart the job from some 
completed checkpoint. Then the error occurs. 

And the cli command and the code are as follows.
{code:java}
// cli command
./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s 
/Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1
 /Users/test/project/self/target/test.jar
{code}
{code:java}
public class KafkaExactlyOnceScaleDownTest { 
public static void main(String[] args) throws Exception { 
    final String kafkaSourceTopic = "flinkSourceTest"; 
    final String kafkaSinkTopic = "flinkSinkExactlyTest1"; 
    final String groupId = "ExactlyOnceTest1"; 
    final String brokers = "localhost:9092"; 
    final String ckDir = "file:///Users/test/checkpointDir/" + groupId; 
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.enableCheckpointing(60000); 
    
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
       
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getCheckpointConfig().setCheckpointStorage(ckDir); 
    env.setParallelism(4); 

    KafkaSource<String> source = KafkaSource.<String>builder() 
     .setBootstrapServers(brokers) 
     .setTopics(kafkaSourceTopic) 
     .setGroupId(groupId) 
     .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .setValueOnlyDeserializer(new SimpleStringSchema()) 
     .build(); 

    DataStream<String> flintstones = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Kafka Source"); 
    DataStream<String> adults = flintstones.filter(s -> s != null && s.length() 
> 2); 
    Properties props = new Properties(); 
    props.setProperty("transaction.timeout.ms", "900000"); 
    adults.sinkTo(KafkaSink.<String>builder() 
    .setBootstrapServers(brokers) 
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) 
    .setTransactionalIdPrefix("tp-test-") 
    .setKafkaProducerConfig(props) 
    .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new 
SimpleStringSchema())) 
    .build()); 

    env.execute("ScaleDownTest"); 
} 

static class SelfSerializationSchema implements 
KafkaRecordSerializationSchema<String> { private final 
SerializationSchema<String> valueSerialization; private String topic; 
SelfSerializationSchema(String topic, SerializationSchema<String> 
valueSerialization){ this.valueSerialization = valueSerialization; this.topic = 
topic; } @Override public void open(SerializationSchema.InitializationContext 
context, KafkaSinkContext sinkContext) throws Exception { 
KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override 
public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext 
kafkaSinkContext, Long aLong) { final byte[] valueSerialized = 
valueSerialization.serialize(s); return new ProducerRecord<>(topic, 
valueSerialized); } } 
}
{code}

  was:
The KafkaSink throws the exception when restarted with a lower parallelism and 
the exactly-once guarantee. The exception is like this.


{noformat}
java.lang.IllegalStateException: Internal error: It is expected that state from 
previous executions is distributed to the same subtask id.   
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)   
at 
org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178)
   
at 
org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130)  
 
at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99) 
  
at 
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134)
   
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
   
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
   
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)
   
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638)
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)   
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)   
at java.lang.Thread.run(Thread.java:748)    Suppressed: 
java.lang.NullPointerException       
at 
org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
       
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927)
       
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797)
        ... 4 more
{noformat}

I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own 
mac. I change the parallelism from 4 to 2 and restart the job from some 
completed checkpoint. Then the error occurs. 

And the cli command and the code are as follows.
{code:java}
// cli command
./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s 
/Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1
 /Users/test/project/self/target/test.jar
{code}
{code:java}
public class KafkaExactlyOnceScaleDownTest { 
public static void main(String[] args) throws Exception { 
    final String kafkaSourceTopic = "flinkSourceTest"; 
    final String kafkaSinkTopic = "flinkSinkExactlyTest1"; 
    final String groupId = "ExactlyOnceTest1"; 
    final String brokers = "localhost:9092"; 
    final String ckDir = "file:///Users/test/checkpointDir/" + groupId; 
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.enableCheckpointing(60000); 
    
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
       
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getCheckpointConfig().setCheckpointStorage(ckDir); 
    env.setParallelism(4); 

    KafkaSource<String> source = KafkaSource.<String>builder() 
     .setBootstrapServers(brokers) 
     .setTopics(kafkaSourceTopic) 
     .setGroupId(groupId) 
     .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .setValueOnlyDeserializer(new SimpleStringSchema()) 
     .build(); 

    DataStream<String> flintstones = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Kafka Source"); 
    DataStream<String> adults = flintstones.filter(s -> s != null && s.length() 
> 2); 
    Properties props = new Properties(); 
    props.setProperty("transaction.timeout.ms", "900000"); 
    adults.sinkTo(KafkaSink.<String>builder() 
    .setBootstrapServers(brokers) 
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) 
    .setTransactionalIdPrefix("tp-test-") 
    .setKafkaProducerConfig(props) 
    .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new 
SimpleStringSchema())) 
    .build()); 

    env.execute("ScaleDownTest"); 
} 

static class SelfSerializationSchema implements 
KafkaRecordSerializationSchema<String> { private final 
SerializationSchema<String> valueSerialization; private String topic; 
SelfSerializationSchema(String topic, SerializationSchema<String> 
valueSerialization){ this.valueSerialization = valueSerialization; this.topic = 
topic; } @Override public void open(SerializationSchema.InitializationContext 
context, KafkaSinkContext sinkContext) throws Exception { 
KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override 
public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext 
kafkaSinkContext, Long aLong) { final byte[] valueSerialized = 
valueSerialization.serialize(s); return new ProducerRecord<>(topic, 
valueSerialized); } } 
}
{code}


> KafkaSink error when restart from the checkpoint with a lower parallelism by 
> exactly-once guarantee
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23854
>                 URL: https://issues.apache.org/jira/browse/FLINK-23854
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Hang Ruan
>            Priority: Blocker
>              Labels: release-testing
>
> The KafkaSink throws the exception when restarted with a lower parallelism 
> and the exactly-once guarantee. The exception is like this.
> {noformat}
> java.lang.IllegalStateException: Internal error: It is expected that state 
> from previous executions is distributed to the same subtask id.   
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)   
> at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178)
>    
> at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130)
>    
> at 
> org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99)
>    
> at 
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134)
>    
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>    
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>    
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
>    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690)
>    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666)
>    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)
>    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638)
>    
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)   
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)   
> at java.lang.Thread.run(Thread.java:748)    
> Suppressed: java.lang.NullPointerException       
> at 
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195)
>        
> at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)
>        
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)
>        
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028)
>        
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014)
>        
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927)
>        
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797)
>         ... 4 more
> {noformat}
> I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own 
> mac. I change the parallelism from 4 to 2 and restart the job from some 
> completed checkpoint. Then the error occurs. 
> And the cli command and the code are as follows.
> {code:java}
> // cli command
> ./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s 
> /Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1
>  /Users/test/project/self/target/test.jar
> {code}
> {code:java}
> public class KafkaExactlyOnceScaleDownTest { 
> public static void main(String[] args) throws Exception { 
>     final String kafkaSourceTopic = "flinkSourceTest"; 
>     final String kafkaSinkTopic = "flinkSinkExactlyTest1"; 
>     final String groupId = "ExactlyOnceTest1"; 
>     final String brokers = "localhost:9092"; 
>     final String ckDir = "file:///Users/test/checkpointDir/" + groupId; 
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(); 
>     env.enableCheckpointing(60000); 
>     
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getCheckpointConfig().setCheckpointStorage(ckDir); 
>     env.setParallelism(4); 
>     KafkaSource<String> source = KafkaSource.<String>builder() 
>      .setBootstrapServers(brokers) 
>      .setTopics(kafkaSourceTopic) 
>      .setGroupId(groupId) 
>      .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
>      .setValueOnlyDeserializer(new SimpleStringSchema()) 
>      .build(); 
>     DataStream<String> flintstones = env.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source"); 
>     DataStream<String> adults = flintstones.filter(s -> s != null && 
> s.length() > 2); 
>     Properties props = new Properties(); 
>     props.setProperty("transaction.timeout.ms", "900000"); 
>     adults.sinkTo(KafkaSink.<String>builder() 
>     .setBootstrapServers(brokers) 
>     .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) 
>     .setTransactionalIdPrefix("tp-test-") 
>     .setKafkaProducerConfig(props) 
>     .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new 
> SimpleStringSchema())) 
>     .build()); 
>     env.execute("ScaleDownTest"); 
> } 
> static class SelfSerializationSchema implements 
> KafkaRecordSerializationSchema<String> { private final 
> SerializationSchema<String> valueSerialization; private String topic; 
> SelfSerializationSchema(String topic, SerializationSchema<String> 
> valueSerialization){ this.valueSerialization = valueSerialization; this.topic 
> = topic; } @Override public void 
> open(SerializationSchema.InitializationContext context, KafkaSinkContext 
> sinkContext) throws Exception { 
> KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override 
> public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext 
> kafkaSinkContext, Long aLong) { final byte[] valueSerialized = 
> valueSerialization.serialize(s); return new ProducerRecord<>(topic, 
> valueSerialized); } } 
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to