[
https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515792#comment-15515792
]
Harald Kirsch edited comment on KAFKA-2170 at 9/23/16 8:37 AM:
---------------------------------------------------------------
Better, but not yet there, it seems. Here is what I did.
I applied the .diff file of the 1757 pull request as downloaded from github
with the patch command to 106a7456060750ab0604d290b8c1e33a57adbf20 from
http://git-wip-us.apache.org/repos/asf/kafka.git. It ran in just fin.
Build the tgz, put it on my test machine, ran kafka with these settings:
{code}
log.segment.bytes=6000111
log.cleaner.enable=true
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.01
log.cleaner.backoff.ms=15000
log.segment.delete.delay.ms=600
auto.create.topics.enable=false
{code}
Ran in around 75 files as binary blobs between 10k and 6M in size twice. The
cleanup triggered and worked just fine.
Tried this a few times more, also with running the files in in quick succession
and it worked just fine.
Stopped Kafka with CTRL-C, it shut down nicely an restarted (mentioning just
for completeness, not sure whether this is relevant).
Tried this a few more times, running the files in roughly 15 times in quick
succession and it bombed out as shown follows:
{code}
kafka.common.KafkaStorageException: Failed to change the log file suffix from
to .deleted for log segment 695
at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:327)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:329)
at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:956)
at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1002)
at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:997)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Log.replaceSegments(Log.scala:997)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:425)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:364)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Cleaner.clean(LogCleaner.scala:363)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:239)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:218)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.file.FileAlreadyExistsException:
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log ->
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log.deleted
at
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:81)
at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:670)
at kafka.log.FileMessageSet.renameTo(FileMessageSet.scala:431)
... 14 more
Suppressed: java.nio.file.AccessDeniedException:
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log ->
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log.deleted
at
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:667)
... 15 more
[2016-09-23 10:12:36,254] INFO [kafka-log-cleaner-thread-0], Stopped
(kafka.log.LogCleaner)
{code}
Afterwards I restartet Kafka, which worked without complaints. I ran one round
on the 75 files and the logcleaner cleaned up just fine, so it looks like
you're pretty close to a working solution.
Let me know if I need to provide more information or run a different experiment.
was (Author: haraldk):
Better, but not yet there, it seems. Here is what I did.
I applied the .diff file of the 1757 pull request as downloaded from github
with the patch command to 106a7456060750ab0604d290b8c1e33a57adbf20 from
http://git-wip-us.apache.org/repos/asf/kafka.git. It ran in just fin.
Build the tgz, put it on my test machine, ran kafka with these settings:
{code}
log.segment.bytes=6000111
log.cleaner.enable=true
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.01
log.cleaner.backoff.ms=15000
log.segment.delete.delay.ms=600
auto.create.topics.enable=false
{code}
Ran in around 75 files as binary blobs between 10k and 6M in size twice. The
cleanup triggered and worked just fine.
Tried this a few times more, also with running the files in in quick succession
and it worked just fine.
Stopped Kafka with CTRL-C, it shut down nicely an restarted (mentioning just
for completeness, not sure whether this is relevant).
Tried this a few more times, running the files in roughly 15 times in quick
succession and it bombed out as shown follows:
{code}
kafka.common.KafkaStorageException: Failed to change the log file suffix from
to .deleted for log segment 695
at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:327)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:329)
at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:956)
at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1002)
at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:997)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Log.replaceSegments(Log.scala:997)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:425)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:364)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Cleaner.clean(LogCleaner.scala:363)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:239)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:218)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.file.FileAlreadyExistsException:
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log ->
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log.deleted
at
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:81)
at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:670)
at kafka.log.FileMessageSet.renameTo(FileMessageSet.scala:431)
... 14 more
Suppressed: java.nio.file.AccessDeniedException:
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log ->
C:\Users\hk\tmp\kafka-data\hktest-0\00000000000000000695.log.deleted
at
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83)
at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
at
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:667)
... 15 more
[2016-09-23 10:12:36,254] INFO [kafka-log-cleaner-thread-0], Stopped
(kafka.log.LogCleaner)
{code}
Let me know if I need to provide more information or run a different experiment.
> 10 LogTest cases failed for file.renameTo failed under windows
> ---------------------------------------------------------------
>
> Key: KAFKA-2170
> URL: https://issues.apache.org/jira/browse/KAFKA-2170
> Project: Kafka
> Issue Type: Bug
> Components: log
> Affects Versions: 0.10.1.0
> Environment: Windows
> Reporter: Honghai Chen
> Assignee: Jay Kreps
>
> get latest code from trunk, then run test
> gradlew -i core:test --tests kafka.log.LogTest
> Got 10 cases failed for same reason:
> kafka.common.KafkaStorageException: Failed to change the log file suffix from
> to .deleted for log segment 0
> at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259)
> at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756)
> at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747)
> at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
> at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at kafka.log.Log.deleteOldSegments(Log.scala:514)
> at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41)
> at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:220)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at $Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
> at
> org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
>
>
> testCompactedTopicConstraints
> java.io.IOException: The requested operation cannot be performed on a file
> with a user-mapped section open
> at java.io.RandomAccessFile.setLength(Native Method)
> at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
> at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
> at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
> at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
> at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
> at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
> at kafka.log.LogSegment.recover(LogSegment.scala:198)
> at kafka.log.Log.recoverLog(Log.scala:238)
> at kafka.log.Log.loadSegments(Log.scala:210)
> at kafka.log.Log.<init>(Log.scala:83)
> at kafka.log.LogTest.testCompactedTopicConstraints(LogTest.scala:370)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41)
> at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:220)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at $Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
> at
> org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> LogTest. testAsyncDelete
> LogTest. testCompactedTopicConstraints
> LogTest. testCorruptLog
> LogTest. testIndexRebuild
> LogTest. testIndexResizingAtTruncation
> LogTest. testLogRecoversToCorrectOffset
> LogTest. testOpenDeletesObsoleteFiles
> LogTest. testReopenThenTruncate
> LogTest. testThatGarbageCollectingSegmentsDoesntChangeOffset
> LogTest. testTruncateTo
> Do we need call log.close and then call rename, and then reopen it?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)