[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108748#comment-16108748
 ] 

Nico Kruber commented on FLINK-6996:
------------------------------------

I got another incarnation (seen only once) with a different failure (only 
change in there is switching from {{HeapMemorySegment}} to 
{{HybridMemorySegment}} but since the memory type was not changed (still at 
on-heap by default) this should not be related.

{code}
09:02:49,616 ERROR 
org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase  - 
--------------------------------------------------------------------------------
Test 
testOneToOneAtLeastOnceCustomOperator(org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase)
 failed with:
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44)
        at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:198)
        at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
        at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:89)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.log.LogCleaner.<init>(LogCleaner.scala:89)
        at kafka.log.LogManager.<init>(LogManager.scala:72)
        at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:648)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:208)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:433)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.restartBroker(KafkaTestEnvironmentImpl.java:181)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:282)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceCustomOperator(KafkaProducerTestBase.java:212)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
{code}

https://transfer.sh/H7pW5/369.3.tar.gz

> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --------------------------------------------------------------
>
>                 Key: FLINK-6996
>                 URL: https://issues.apache.org/jira/browse/FLINK-6996
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.2
>
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to