Copilot commented on code in PR #8415:
URL: https://github.com/apache/storm/pull/8415#discussion_r2894867802


##########
storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FileBasedEventLoggerTest {
+
+    private Path tempDir;
+    private FileBasedEventLogger eventLogger;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        tempDir = Files.createTempDirectory("storm-eventlogger-test");
+        eventLogger = new FileBasedEventLogger();
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        eventLogger.close();
+        if (tempDir != null) {
+            Files.walk(tempDir)
+                    .map(Path::toFile)
+                    .forEach(File::delete);
+            tempDir.toFile().delete();
+        }
+    }
+
+    private TopologyContext mockTopologyContext() {
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getStormId()).thenReturn("test-topology-1");
+        when(context.getThisWorkerPort()).thenReturn(6700);
+        return context;
+    }
+
+    @Test
+    public void testFileRotation() throws IOException, InterruptedException {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, 
tempDir.toAbsolutePath().toString());
+        // We set rotation to be 1MB to trigger it easily, but we'll need to 
write
+        // a lot. Alternatively, we can use a very small value, but we need an 
int >= 1.
+        // Wait, Config is by MB. If we set it to 1, we still need to write 
1MB.
+        // Let's reflection inject a smaller value for tests? No, Storm uses 
config. 
+        // We will just use `1` MB and write a large string a few times.
+        conf.put(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB, 1);
+        conf.put(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES, 2);
+
+        eventLogger.prepare(conf, new HashMap<>(), mockTopologyContext());
+
+        // 1 MB = 1048576 bytes
+        // We create an event message that is about 100KB, write it 11 times 
to exceed 1MB.
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100_000; i++) {
+            sb.append("A"); // 1 byte
+        }
+        String largeValue = sb.toString();
+
+        List<Object> values = new ArrayList<>();
+        values.add(largeValue);
+        
+        // This toString() will add some bytes overhead, so each event is ~ 
100KB.
+        IEventLogger.EventInfo eventInfo = new IEventLogger.EventInfo(
+                System.currentTimeMillis(), "test-component", 1, "msgId", 
values);
+
+        // Write 10 times -> ~1 MB
+        for (int i = 0; i < 10; i++) {
+            eventLogger.log(eventInfo);
+        }
+
+        // Wait a bit for flush if any (though rotation is synchronous in 
write)
+        Thread.sleep(100);
+

Review Comment:
   The `Thread.sleep(...)` calls make this test slower and can introduce 
timing-related flakiness. Rotation in `FileBasedEventLogger.log()` is 
synchronous, so these sleeps should be removable; if waiting is required, 
prefer asserting on filesystem state with a bounded retry/poll loop instead of 
fixed sleeps.



##########
storm-client/src/jvm/org/apache/storm/Config.java:
##########
@@ -477,6 +477,20 @@ public class Config extends HashMap<String, Object> {
     @IsInteger
     @IsPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = 
"topology.eventlogger.executors";
+    /**
+     * The maximum size in MB for the event logger file before it rotates.
+     * If not specified, a default of 1000 MB is used.

Review Comment:
   Javadoc says the default rotation size is 1000 MB, but 
`FileBasedEventLogger` currently defaults to 100 MB (`DEFAULT_ROTATION_SIZE_MB 
= 100`). Please align the documentation and implementation so operators aren’t 
misled about the actual default behavior.
   ```suggestion
        * If not specified, a default of 100 MB is used.
   ```



##########
storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FileBasedEventLoggerTest {
+
+    private Path tempDir;
+    private FileBasedEventLogger eventLogger;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        tempDir = Files.createTempDirectory("storm-eventlogger-test");
+        eventLogger = new FileBasedEventLogger();
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        eventLogger.close();
+        if (tempDir != null) {
+            Files.walk(tempDir)
+                    .map(Path::toFile)
+                    .forEach(File::delete);
+            tempDir.toFile().delete();
+        }

Review Comment:
   The test cleanup uses `Files.walk(tempDir)` without closing the returned 
stream and deletes in traversal order, which can fail/flap (especially on 
Windows) because directories may be deleted before their children and open 
directory handles can block deletion. Use try-with-resources on the stream and 
delete paths in reverse order (or use JUnit 5 `@TempDir` to let the framework 
handle cleanup).



##########
storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java:
##########
@@ -109,16 +128,47 @@ public void prepare(Map<String, Object> conf, Map<String, 
Object> arguments, Top
     @Override
     public void log(EventInfo event) {
         try {
-            //TODO: file rotation
-            eventLogWriter.write(buildLogMessage(event));
+            String logMessage = buildLogMessage(event);
+            byte[] logBytes = logMessage.getBytes(StandardCharsets.UTF_8);
+            int writeLength = logBytes.length + 
System.lineSeparator().length();
+

Review Comment:
   `log()` allocates a new UTF-8 byte[] for every event just to compute size, 
which can add significant GC pressure in high-throughput topologies. Consider a 
lower-allocation approach (e.g., periodically check `Files.size(...)`, maintain 
a counting output stream, or use a reusable encoder/byte counting strategy) 
while still keeping rotation accurate.



##########
storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java:
##########
@@ -82,25 +93,33 @@ public void run() {
             }
         };
 
-        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, 
FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, 
FLUSH_INTERVAL_MILLIS,
+                TimeUnit.MILLISECONDS);

Review Comment:
   The scheduled flush task currently throws a `RuntimeException` on 
`IOException`. In `ScheduledExecutorService`, an exception from the runnable 
typically suppresses subsequent executions of that periodic task, which can 
silently disable flushing after a transient filesystem issue. Prefer handling 
the exception inside the runnable (log and continue, or trigger a controlled 
shutdown) so the periodic flush behavior remains predictable.



##########
storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;

Review Comment:
   Unused static import: `assertEquals` isn’t referenced in this test file. 
Removing it avoids unnecessary warnings and keeps imports clean.
   ```suggestion
   
   ```



##########
storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java:
##########
@@ -109,16 +128,47 @@ public void prepare(Map<String, Object> conf, Map<String, 
Object> arguments, Top
     @Override
     public void log(EventInfo event) {
         try {
-            //TODO: file rotation
-            eventLogWriter.write(buildLogMessage(event));
+            String logMessage = buildLogMessage(event);
+            byte[] logBytes = logMessage.getBytes(StandardCharsets.UTF_8);
+            int writeLength = logBytes.length + 
System.lineSeparator().length();
+
+            if (currentFileSize + writeLength > maxFileSize) {
+                rotateFiles();
+            }
+
+            eventLogWriter.write(logMessage);
             eventLogWriter.newLine();
+            currentFileSize += writeLength;
             dirty = true;
         } catch (IOException ex) {
             LOG.error("Error logging event {}", event, ex);
             throw new RuntimeException(ex);
         }
     }
 
+    private void rotateFiles() throws IOException {
+        eventLogWriter.close();
+

Review Comment:
   `rotateFiles()` closes and reinitializes `eventLogWriter` while a scheduled 
flush task may be running and `log()` may be writing, but there’s no shared 
synchronization around writer operations. This can race (flush/write on a 
closed writer during rotation) and cause intermittent IOExceptions and 
lost/partial logs. Add a lock around all writer usage 
(write/flush/close/rotate) or route writes+flush+rotation through a 
single-thread executor.



##########
storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java:
##########
@@ -109,16 +128,47 @@ public void prepare(Map<String, Object> conf, Map<String, 
Object> arguments, Top
     @Override
     public void log(EventInfo event) {
         try {
-            //TODO: file rotation
-            eventLogWriter.write(buildLogMessage(event));
+            String logMessage = buildLogMessage(event);
+            byte[] logBytes = logMessage.getBytes(StandardCharsets.UTF_8);
+            int writeLength = logBytes.length + 
System.lineSeparator().length();
+
+            if (currentFileSize + writeLength > maxFileSize) {
+                rotateFiles();
+            }
+
+            eventLogWriter.write(logMessage);
             eventLogWriter.newLine();
+            currentFileSize += writeLength;
             dirty = true;
         } catch (IOException ex) {
             LOG.error("Error logging event {}", event, ex);
             throw new RuntimeException(ex);
         }
     }
 
+    private void rotateFiles() throws IOException {
+        eventLogWriter.close();
+
+        // Shift existing rotated files
+        for (int i = maxRetainedFiles - 1; i >= 1; i--) {
+            Path src = Paths.get(eventLogPath.toString() + "." + i);
+            Path dst = Paths.get(eventLogPath.toString() + "." + (i + 1));
+            if (Files.exists(src)) {
+                Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
+            }
+        }

Review Comment:
   Rotation doesn’t proactively remove files beyond `maxRetainedFiles` if they 
already exist (e.g., when the retention config is reduced, older `events.log.N` 
files where N > maxRetainedFiles may remain on disk indefinitely). Consider 
deleting `events.log.(maxRetainedFiles+1)` (and higher) before/after shifting 
so the retention limit is always enforced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to