Copilot commented on code in PR #8415: URL: https://github.com/apache/storm/pull/8415#discussion_r2894867802
########## storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metric; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FileBasedEventLoggerTest { + + private Path tempDir; + private FileBasedEventLogger eventLogger; + + @BeforeEach + public void setUp() throws IOException { + tempDir = Files.createTempDirectory("storm-eventlogger-test"); + eventLogger = new FileBasedEventLogger(); + } + + @AfterEach + public void tearDown() throws IOException { + eventLogger.close(); + if (tempDir != null) { + Files.walk(tempDir) + .map(Path::toFile) + .forEach(File::delete); + tempDir.toFile().delete(); + } + } + + private TopologyContext mockTopologyContext() { + TopologyContext context = mock(TopologyContext.class); + when(context.getStormId()).thenReturn("test-topology-1"); + when(context.getThisWorkerPort()).thenReturn(6700); + return context; + } + + @Test + public void testFileRotation() throws IOException, InterruptedException { + Map<String, Object> conf = new HashMap<>(); + conf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, tempDir.toAbsolutePath().toString()); + // We set rotation to be 1MB to trigger it easily, but we'll need to write + // a lot. Alternatively, we can use a very small value, but we need an int >= 1. + // Wait, Config is by MB. If we set it to 1, we still need to write 1MB. + // Let's reflection inject a smaller value for tests? No, Storm uses config. + // We will just use `1` MB and write a large string a few times. + conf.put(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB, 1); + conf.put(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES, 2); + + eventLogger.prepare(conf, new HashMap<>(), mockTopologyContext()); + + // 1 MB = 1048576 bytes + // We create an event message that is about 100KB, write it 11 times to exceed 1MB. + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100_000; i++) { + sb.append("A"); // 1 byte + } + String largeValue = sb.toString(); + + List<Object> values = new ArrayList<>(); + values.add(largeValue); + + // This toString() will add some bytes overhead, so each event is ~ 100KB. + IEventLogger.EventInfo eventInfo = new IEventLogger.EventInfo( + System.currentTimeMillis(), "test-component", 1, "msgId", values); + + // Write 10 times -> ~1 MB + for (int i = 0; i < 10; i++) { + eventLogger.log(eventInfo); + } + + // Wait a bit for flush if any (though rotation is synchronous in write) + Thread.sleep(100); + Review Comment: The `Thread.sleep(...)` calls make this test slower and can introduce timing-related flakiness. Rotation in `FileBasedEventLogger.log()` is synchronous, so these sleeps should be removable; if waiting is required, prefer asserting on filesystem state with a bounded retry/poll loop instead of fixed sleeps. ########## storm-client/src/jvm/org/apache/storm/Config.java: ########## @@ -477,6 +477,20 @@ public class Config extends HashMap<String, Object> { @IsInteger @IsPositiveNumber(includeZero = true) public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors"; + /** + * The maximum size in MB for the event logger file before it rotates. + * If not specified, a default of 1000 MB is used. Review Comment: Javadoc says the default rotation size is 1000 MB, but `FileBasedEventLogger` currently defaults to 100 MB (`DEFAULT_ROTATION_SIZE_MB = 100`). Please align the documentation and implementation so operators aren’t misled about the actual default behavior. ```suggestion * If not specified, a default of 100 MB is used. ``` ########## storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metric; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FileBasedEventLoggerTest { + + private Path tempDir; + private FileBasedEventLogger eventLogger; + + @BeforeEach + public void setUp() throws IOException { + tempDir = Files.createTempDirectory("storm-eventlogger-test"); + eventLogger = new FileBasedEventLogger(); + } + + @AfterEach + public void tearDown() throws IOException { + eventLogger.close(); + if (tempDir != null) { + Files.walk(tempDir) + .map(Path::toFile) + .forEach(File::delete); + tempDir.toFile().delete(); + } Review Comment: The test cleanup uses `Files.walk(tempDir)` without closing the returned stream and deletes in traversal order, which can fail/flap (especially on Windows) because directories may be deleted before their children and open directory handles can block deletion. Use try-with-resources on the stream and delete paths in reverse order (or use JUnit 5 `@TempDir` to let the framework handle cleanup). ########## storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java: ########## @@ -109,16 +128,47 @@ public void prepare(Map<String, Object> conf, Map<String, Object> arguments, Top @Override public void log(EventInfo event) { try { - //TODO: file rotation - eventLogWriter.write(buildLogMessage(event)); + String logMessage = buildLogMessage(event); + byte[] logBytes = logMessage.getBytes(StandardCharsets.UTF_8); + int writeLength = logBytes.length + System.lineSeparator().length(); + Review Comment: `log()` allocates a new UTF-8 byte[] for every event just to compute size, which can add significant GC pressure in high-throughput topologies. Consider a lower-allocation approach (e.g., periodically check `Files.size(...)`, maintain a counting output stream, or use a reusable encoder/byte counting strategy) while still keeping rotation accurate. ########## storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java: ########## @@ -82,25 +93,33 @@ public void run() { } }; - flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); Review Comment: The scheduled flush task currently throws a `RuntimeException` on `IOException`. In `ScheduledExecutorService`, an exception from the runnable typically suppresses subsequent executions of that periodic task, which can silently disable flushing after a transient filesystem issue. Prefer handling the exception inside the runnable (log and continue, or trigger a controlled shutdown) so the periodic flush behavior remains predictable. ########## storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metric; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; Review Comment: Unused static import: `assertEquals` isn’t referenced in this test file. Removing it avoids unnecessary warnings and keeps imports clean. ```suggestion ``` ########## storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java: ########## @@ -109,16 +128,47 @@ public void prepare(Map<String, Object> conf, Map<String, Object> arguments, Top @Override public void log(EventInfo event) { try { - //TODO: file rotation - eventLogWriter.write(buildLogMessage(event)); + String logMessage = buildLogMessage(event); + byte[] logBytes = logMessage.getBytes(StandardCharsets.UTF_8); + int writeLength = logBytes.length + System.lineSeparator().length(); + + if (currentFileSize + writeLength > maxFileSize) { + rotateFiles(); + } + + eventLogWriter.write(logMessage); eventLogWriter.newLine(); + currentFileSize += writeLength; dirty = true; } catch (IOException ex) { LOG.error("Error logging event {}", event, ex); throw new RuntimeException(ex); } } + private void rotateFiles() throws IOException { + eventLogWriter.close(); + Review Comment: `rotateFiles()` closes and reinitializes `eventLogWriter` while a scheduled flush task may be running and `log()` may be writing, but there’s no shared synchronization around writer operations. This can race (flush/write on a closed writer during rotation) and cause intermittent IOExceptions and lost/partial logs. Add a lock around all writer usage (write/flush/close/rotate) or route writes+flush+rotation through a single-thread executor. ########## storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java: ########## @@ -109,16 +128,47 @@ public void prepare(Map<String, Object> conf, Map<String, Object> arguments, Top @Override public void log(EventInfo event) { try { - //TODO: file rotation - eventLogWriter.write(buildLogMessage(event)); + String logMessage = buildLogMessage(event); + byte[] logBytes = logMessage.getBytes(StandardCharsets.UTF_8); + int writeLength = logBytes.length + System.lineSeparator().length(); + + if (currentFileSize + writeLength > maxFileSize) { + rotateFiles(); + } + + eventLogWriter.write(logMessage); eventLogWriter.newLine(); + currentFileSize += writeLength; dirty = true; } catch (IOException ex) { LOG.error("Error logging event {}", event, ex); throw new RuntimeException(ex); } } + private void rotateFiles() throws IOException { + eventLogWriter.close(); + + // Shift existing rotated files + for (int i = maxRetainedFiles - 1; i >= 1; i--) { + Path src = Paths.get(eventLogPath.toString() + "." + i); + Path dst = Paths.get(eventLogPath.toString() + "." + (i + 1)); + if (Files.exists(src)) { + Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING); + } + } Review Comment: Rotation doesn’t proactively remove files beyond `maxRetainedFiles` if they already exist (e.g., when the retention config is reduced, older `events.log.N` files where N > maxRetainedFiles may remain on disk indefinitely). Consider deleting `events.log.(maxRetainedFiles+1)` (and higher) before/after shifting so the retention limit is always enforced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
