This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9363ea6cc SAMZA-2740: Provide a way to specify a static file name for 
container metadata file (#1604)
9363ea6cc is described below

commit 9363ea6ccfb9f413dba4a5a49bcaa8b9de43b8bc
Author: Cameron Lee <[email protected]>
AuthorDate: Mon May 23 17:29:31 2022 -0700

    SAMZA-2740: Provide a way to specify a static file name for container 
metadata file (#1604)
    
    API/usage changes: (backwards compatible) If CONTAINER_METADATA_FILENAME is 
specified, then use that for generating the container metadata file name. 
Otherwise, fall back to using the exec-env-container-id to generate the file 
name.
---
 .../java/org/apache/samza/config/JobConfig.java    | 32 ++++++++----
 .../samza/environment/EnvironmentVariables.java    |  7 +++
 .../org/apache/samza/util/DiagnosticsUtil.java     |  1 +
 .../org/apache/samza/config/TestJobConfig.java     | 60 ++++++++++++++++++----
 4 files changed, 80 insertions(+), 20 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index c77d673f9..4bff52d59 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -24,10 +24,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
 import 
org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,7 +146,7 @@ public class JobConfig extends MapConfig {
   static final int DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1;
 
   // Naming format and directory for container.metadata file
-  public static final String CONTAINER_METADATA_FILENAME_FORMAT = 
"%s.metadata"; // Filename: <containerID>.metadata
+  public static final String CONTAINER_METADATA_FILENAME_FORMAT = 
"%s.metadata";
   public static final String CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY = 
"samza.log.dir";
 
   // Auto-sizing related configs that take precedence over respective sizing 
confings job.container.count, etc,
@@ -430,18 +432,30 @@ public class JobConfig extends MapConfig {
   }
 
   /**
-   * The metadata file is written in a {@code exec-env-container-id}.metadata 
file in the log-dir of the container.
-   * Here the {@code exec-env-container-id} refers to the ID assigned by the 
cluster manager (e.g., YARN) to the container,
-   * which uniquely identifies a container's lifecycle.
+   * Get the {@link File} in the "samza.log.dir" for writing container 
metadata.
+   * If {@link EnvironmentVariables#ENV_CONTAINER_METADATA_FILENAME} is 
specified, then use that as the file name.
+   * Otherwise, the file name is {@code exec-env-container-id}.metadata. Here 
the {@code exec-env-container-id} refers
+   * to the ID assigned by the cluster manager (e.g., YARN) to the container, 
which uniquely identifies a container's
+   * lifecycle.
    */
   public static Optional<File> getMetadataFile(String execEnvContainerId) {
     String dir = System.getProperty(CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);
-    if (dir == null || execEnvContainerId == null) {
-      return Optional.empty();
-    } else {
-      return Optional.of(
-          new File(dir, String.format(CONTAINER_METADATA_FILENAME_FORMAT, 
execEnvContainerId)));
+    if (dir != null) {
+      String fileName = 
System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME);
+      if (StringUtils.isNotBlank(fileName)) {
+        if (fileName.contains(File.separator)) {
+          throw new IllegalStateException(String.format("%s should not include 
directories, but it is %s",
+              EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME, fileName));
+        } else {
+          return Optional.of(new File(dir, fileName));
+        }
+      } else {
+        if (execEnvContainerId != null) {
+          return Optional.of(new File(dir, 
String.format(CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId)));
+        }
+      }
     }
+    return Optional.empty();
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
 
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
index 91977a206..a570e154c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
+++ 
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
@@ -36,4 +36,11 @@ public class EnvironmentVariables {
    * environment variable.
    */
   public static final String SAMZA_EPOCH_ID = "SAMZA_EPOCH_ID";
+
+  /**
+   * (Optional) File name to use for container metadata file. This should just 
be a file name, and it should not include
+   * any directory structure.
+   * If this is not specified, then the framework will choose a file name.
+   */
+  public static final String ENV_CONTAINER_METADATA_FILENAME = 
"CONTAINER_METADATA_FILENAME";
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 7ef29ac71..9521f29bd 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -80,6 +80,7 @@ public class DiagnosticsUtil {
       MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new 
Metrics());
       MetadataFileContents metadataFileContents =
           new MetadataFileContents("1", new String(new 
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+      log.info("Writing metadata contents to {}", 
metadataFile.get().getPath());
       new FileUtil().writeToTextFile(metadataFile.get(), new String(new 
JsonSerde<>().toBytes(metadataFileContents)), false);
     } else {
       log.info("Skipping writing metadata file.");
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index a037da41b..f56af2937 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -18,25 +18,35 @@
  */
 package org.apache.samza.config;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
 import 
org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
 import org.junit.Assert;
 import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({System.class, JobConfig.class})
 public class TestJobConfig {
   @Test
   public void testGetName() {
@@ -549,13 +559,41 @@ public class TestJobConfig {
   public void testGetMetadataFile() {
     String execEnvContainerId = "container-id";
     String containerMetadataDirectory = "/tmp/samza/log/dir";
-    System.setProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY, 
containerMetadataDirectory);
-    assertEquals(new File(containerMetadataDirectory,
-            String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT, 
execEnvContainerId)).getPath(),
-        JobConfig.getMetadataFile(execEnvContainerId).get().getPath());
-    System.clearProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);
+    String containerMetadataFileNameFromEnv = 
"container-metadata-file.metadata";
+
+    PowerMockito.mockStatic(System.class);
+    
PowerMockito.when(System.getProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY)).thenReturn(null);
+    
PowerMockito.when(System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME)).thenReturn(null);
 
+    // samza.log.dir not specified
+    assertEquals(Optional.empty(), 
JobConfig.getMetadataFile(execEnvContainerId));
+
+    // provide value for samza.log.dir for remainder of tests
+    
PowerMockito.when(System.getProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY))
+        .thenReturn(containerMetadataDirectory);
+
+    // CONTAINER_METADATA_FILENAME not specified, execEnvContainerId specified
+    assertEquals(Optional.of(new File(containerMetadataDirectory,
+            String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT, 
execEnvContainerId))),
+        JobConfig.getMetadataFile(execEnvContainerId));
+
+    // CONTAINER_METADATA_FILENAME not specified, execEnvContainerId not 
specified
     assertEquals(Optional.empty(), JobConfig.getMetadataFile(null));
+
+    // CONTAINER_METADATA_FILENAME specified
+    
PowerMockito.when(System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME))
+        .thenReturn(containerMetadataFileNameFromEnv);
+    assertEquals(Optional.of(new File(containerMetadataDirectory, 
containerMetadataFileNameFromEnv)),
+        JobConfig.getMetadataFile(execEnvContainerId));
+
+    // CONTAINER_METADATA_FILENAME invalid
+    
PowerMockito.when(System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME))
+        .thenReturn("file/with/directories/file.txt");
+    try {
+      JobConfig.getMetadataFile(execEnvContainerId);
+    } catch (IllegalStateException e) {
+      // expected to throw exception for having directories in file name
+    }
   }
 
   @Test

Reply via email to