This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new e722c2fda Implement size-based file rotation for FileBasedEventLogger 
#8415
e722c2fda is described below

commit e722c2fdadbe8153746f7f1d9fccc8d1d66187e2
Author: ANKIT KUMAR <[email protected]>
AuthorDate: Mon Mar 9 19:07:19 2026 +0530

    Implement size-based file rotation for FileBasedEventLogger #8415
---
 storm-client/src/jvm/org/apache/storm/Config.java  |  14 ++
 .../apache/storm/metric/FileBasedEventLogger.java  | 116 +++++++++++++----
 .../storm/metric/FileBasedEventLoggerTest.java     | 144 +++++++++++++++++++++
 3 files changed, 250 insertions(+), 24 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index a098e6cb6..e332b726b 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -477,6 +477,20 @@ public class Config extends HashMap<String, Object> {
     @IsInteger
     @IsPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = 
"topology.eventlogger.executors";
+    /**
+     * The maximum size in MB for the event logger file before it rotates.
+     * If not specified, a default of 100 MB is used.
+     */
+    @IsInteger
+    @IsPositiveNumber
+    public static final String TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB = 
"topology.eventlogger.rotation.size.mb";
+    /**
+     * The maximum number of retained files for the event logger.
+     * If not specified, a default of 5 is used.
+     */
+    @IsInteger
+    @IsPositiveNumber
+    public static final String TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES = 
"topology.eventlogger.max.retained.files";
     /**
      * The maximum amount of time given to the topology to fully process a 
message emitted by a spout. If the message is not acked within
      * this time frame, Storm will fail the message on the spout. Some spouts 
implementations will then replay the message at a later time.
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java 
b/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
index 64ee85cd8..f626b8396 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java
@@ -19,21 +19,23 @@
 package org.apache.storm.metric;
 
 import java.io.BufferedWriter;
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.Config;
 import 
org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,56 +43,74 @@ public class FileBasedEventLogger implements IEventLogger {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedEventLogger.class);
 
     private static final int FLUSH_INTERVAL_MILLIS = 1000;
+    private static final long BYTES_PER_MB = 1024L * 1024L;
+    private static final int DEFAULT_ROTATION_SIZE_MB = 100;
+    private static final int DEFAULT_MAX_RETAINED_FILES = 5;
 
     private Path eventLogPath;
     private BufferedWriter eventLogWriter;
     private ScheduledExecutorService flushScheduler;
     private volatile boolean dirty = false;
+    private final Object writeLock = new Object();
+
+    // File rotation configs
+    private long maxFileSize;
+    private int maxRetainedFiles;
+    private long currentFileSize = 0;
 
     private void initLogWriter(Path logFilePath) {
         try {
             LOG.info("logFilePath {}", logFilePath);
             eventLogPath = logFilePath;
+
+            currentFileSize = Files.exists(eventLogPath) ? 
Files.size(eventLogPath) : 0L;
+
             eventLogWriter = Files.newBufferedWriter(eventLogPath, 
StandardCharsets.UTF_8, StandardOpenOption.CREATE,
-                                                     StandardOpenOption.WRITE, 
StandardOpenOption.APPEND);
+                    StandardOpenOption.WRITE, StandardOpenOption.APPEND);
         } catch (IOException e) {
             LOG.error("Error setting up FileBasedEventLogger.", e);
             throw new RuntimeException(e);
         }
     }
 
-
     private void setUpFlushTask() {
         ThreadFactory threadFactory = new ThreadFactoryBuilder()
-            .setNameFormat("event-logger-flush-%d")
-            .setDaemon(true)
-            .build();
+                .setNameFormat("event-logger-flush-%d")
+                .setDaemon(true)
+                .build();
 
         flushScheduler = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
         Runnable runnable = new Runnable() {
             @Override
             public void run() {
                 try {
-                    if (dirty) {
-                        eventLogWriter.flush();
-                        dirty = false;
+                    synchronized (writeLock) {
+                        if (dirty && eventLogWriter != null) {
+                            eventLogWriter.flush();
+                            dirty = false;
+                        }
                     }
-                } catch (IOException ex) {
+                } catch (Exception ex) {
                     LOG.error("Error flushing " + eventLogPath, ex);
-                    throw new RuntimeException(ex);
                 }
             }
         };
 
-        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, 
FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, 
FLUSH_INTERVAL_MILLIS,
+                TimeUnit.MILLISECONDS);
     }
 
-
     @Override
     public void prepare(Map<String, Object> conf, Map<String, Object> 
arguments, TopologyContext context) {
         String stormId = context.getStormId();
         int port = context.getThisWorkerPort();
 
+        int rotationSizeMb = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB),
+                DEFAULT_ROTATION_SIZE_MB);
+        this.maxFileSize = rotationSizeMb * BYTES_PER_MB;
+        this.maxRetainedFiles = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES),
+                DEFAULT_MAX_RETAINED_FILES);
+
         /*
          * Include the topology name & worker port in the file name so that
          * multiple event loggers can log independently.
@@ -98,9 +118,11 @@ public class FileBasedEventLogger implements IEventLogger {
         String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, 
stormId, port);
 
         Path path = Paths.get(workersArtifactRoot, "events.log");
-        File dir = path.toFile().getParentFile();
-        if (!dir.exists()) {
-            dir.mkdirs();
+        try {
+            Files.createDirectories(path.getParent());
+        } catch (IOException e) {
+            LOG.error("Failed to create directories for event logger", e);
+            throw new RuntimeException(e);
         }
         initLogWriter(path);
         setUpFlushTask();
@@ -109,30 +131,76 @@ public class FileBasedEventLogger implements IEventLogger 
{
     @Override
     public void log(EventInfo event) {
         try {
-            //TODO: file rotation
-            eventLogWriter.write(buildLogMessage(event));
-            eventLogWriter.newLine();
-            dirty = true;
+            String logMessage = buildLogMessage(event);
+            int writeLength = logMessage.length() + 
System.lineSeparator().length();
+
+            synchronized (writeLock) {
+                if (currentFileSize + writeLength > maxFileSize) {
+                    rotateFiles();
+                }
+
+                if (eventLogWriter != null) {
+                    eventLogWriter.write(logMessage);
+                    eventLogWriter.newLine();
+                    currentFileSize += writeLength;
+                    dirty = true;
+                }
+            }
         } catch (IOException ex) {
             LOG.error("Error logging event {}", event, ex);
             throw new RuntimeException(ex);
         }
     }
 
+    private void rotateFiles() throws IOException {
+        eventLogWriter.close();
+
+        // Delete any files that exceed maxRetainedFiles (e.g. if the config 
was
+        // lowered)
+        int i = maxRetainedFiles;
+        while (Files.exists(Paths.get(eventLogPath.toString() + "." + i))) {
+            Files.delete(Paths.get(eventLogPath.toString() + "." + i));
+            i++;
+        }
+
+        // Shift existing rotated files
+        for (i = maxRetainedFiles - 1; i >= 1; i--) {
+            Path src = Paths.get(eventLogPath.toString() + "." + i);
+            Path dst = Paths.get(eventLogPath.toString() + "." + (i + 1));
+            if (Files.exists(src)) {
+                Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
+            }
+        }
+
+        // Rename current events.log
+        if (Files.exists(eventLogPath)) {
+            Path dst = Paths.get(eventLogPath.toString() + ".1");
+            Files.move(eventLogPath, dst, StandardCopyOption.REPLACE_EXISTING);
+        }
+
+        // Re-open writers to empty file
+        initLogWriter(eventLogPath);
+        currentFileSize = 0;
+    }
+
     protected String buildLogMessage(EventInfo event) {
         return event.toString();
     }
 
     @Override
     public void close() {
-        try {
-            eventLogWriter.close();
+        closeFlushScheduler();
 
+        try {
+            synchronized (writeLock) {
+                if (eventLogWriter != null) {
+                    eventLogWriter.close();
+                    eventLogWriter = null;
+                }
+            }
         } catch (IOException ex) {
             LOG.error("Error closing event log.", ex);
         }
-
-        closeFlushScheduler();
     }
 
     private void closeFlushScheduler() {
diff --git 
a/storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java 
b/storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java
new file mode 100644
index 000000000..1db39b4ac
--- /dev/null
+++ 
b/storm-client/test/jvm/org/apache/storm/metric/FileBasedEventLoggerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FileBasedEventLoggerTest {
+
+    private Path tempDir;
+    private FileBasedEventLogger eventLogger;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        tempDir = Files.createTempDirectory("storm-eventlogger-test");
+        eventLogger = new FileBasedEventLogger();
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        eventLogger.close();
+        if (tempDir != null) {
+            Files.walk(tempDir)
+                    .map(Path::toFile)
+                    .forEach(File::delete);
+            tempDir.toFile().delete();
+        }
+    }
+
+    private TopologyContext mockTopologyContext() {
+        TopologyContext context = mock(TopologyContext.class);
+        when(context.getStormId()).thenReturn("test-topology-1");
+        when(context.getThisWorkerPort()).thenReturn(6700);
+        return context;
+    }
+
+    @Test
+    public void testFileRotation() throws IOException, InterruptedException {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, 
tempDir.toAbsolutePath().toString());
+        // We set rotation to be 1MB to trigger it easily, but we'll need to 
write
+        // a lot. Alternatively, we can use a very small value, but we need an 
int >= 1.
+        // Wait, Config is by MB. If we set it to 1, we still need to write 
1MB.
+        // Let's reflection inject a smaller value for tests? No, Storm uses 
config. 
+        // We will just use `1` MB and write a large string a few times.
+        conf.put(Config.TOPOLOGY_EVENTLOGGER_ROTATION_SIZE_MB, 1);
+        conf.put(Config.TOPOLOGY_EVENTLOGGER_MAX_RETAINED_FILES, 2);
+
+        eventLogger.prepare(conf, new HashMap<>(), mockTopologyContext());
+
+        // 1 MB = 1048576 bytes
+        // We create an event message that is about 100KB, write it 11 times 
to exceed 1MB.
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100_000; i++) {
+            sb.append("A"); // 1 byte
+        }
+        String largeValue = sb.toString();
+
+        List<Object> values = new ArrayList<>();
+        values.add(largeValue);
+        
+        // This toString() will add some bytes overhead, so each event is ~ 
100KB.
+        IEventLogger.EventInfo eventInfo = new IEventLogger.EventInfo(
+                System.currentTimeMillis(), "test-component", 1, "msgId", 
values);
+
+        // Write 10 times -> ~1 MB
+        for (int i = 0; i < 10; i++) {
+            eventLogger.log(eventInfo);
+        }
+
+        // Wait a bit for flush if any (though rotation is synchronous in 
write)
+        Thread.sleep(100);
+
+        Path expectedLogDir = 
tempDir.resolve("test-topology-1").resolve("6700");
+        Path logFile = expectedLogDir.resolve("events.log");
+        Path logFile1 = expectedLogDir.resolve("events.log.1");
+        Path logFile2 = expectedLogDir.resolve("events.log.2");
+
+        // The first 10 writes should be in one file, almost 1 MB.
+        assertTrue(Files.exists(logFile));
+        
+        // Write 2 more times to push it over 1MB
+        eventLogger.log(eventInfo);
+        eventLogger.log(eventInfo);
+        
+        Thread.sleep(100);
+
+        // Now we expect events.log.1 to exist and events.log to be new
+        assertTrue(Files.exists(logFile1), "Rotated file events.log.1 should 
exist");
+        
+        // Write 12 more times to push over 1MB again
+        for (int i = 0; i < 12; i++) {
+            eventLogger.log(eventInfo);
+        }
+        
+        Thread.sleep(100);
+
+        // Now events.log.2 and events.log.1 and events.log should exist
+        assertTrue(Files.exists(logFile2), "Rotated file events.log.2 should 
exist");
+
+        // Write 12 MORE times to push over 1MB again
+        for (int i = 0; i < 12; i++) {
+            eventLogger.log(eventInfo);
+        }
+
+        Thread.sleep(100);
+
+        // max config was 2, so events.log.3 should NOT exist, and 
events.log.2 
+        // should exist.
+        Path logFile3 = expectedLogDir.resolve("events.log.3");
+        assertTrue(!Files.exists(logFile3), "Rotated file events.log.3 should 
not exist");
+        assertTrue(Files.exists(logFile2), "Rotated file events.log.2 should 
exist");
+    }
+}

Reply via email to