This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9363ea6cc SAMZA-2740: Provide a way to specify a static file name for
container metadata file (#1604)
9363ea6cc is described below
commit 9363ea6ccfb9f413dba4a5a49bcaa8b9de43b8bc
Author: Cameron Lee <[email protected]>
AuthorDate: Mon May 23 17:29:31 2022 -0700
SAMZA-2740: Provide a way to specify a static file name for container
metadata file (#1604)
API/usage changes: (backwards compatible) If CONTAINER_METADATA_FILENAME is
specified, then use that for generating the container metadata file name.
Otherwise, fall back to using the exec-env-container-id to generate the file
name.
---
.../java/org/apache/samza/config/JobConfig.java | 32 ++++++++----
.../samza/environment/EnvironmentVariables.java | 7 +++
.../org/apache/samza/util/DiagnosticsUtil.java | 1 +
.../org/apache/samza/config/TestJobConfig.java | 60 ++++++++++++++++++----
4 files changed, 80 insertions(+), 20 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index c77d673f9..4bff52d59 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -24,10 +24,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import
org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,7 +146,7 @@ public class JobConfig extends MapConfig {
static final int DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1;
// Naming format and directory for container.metadata file
- public static final String CONTAINER_METADATA_FILENAME_FORMAT =
"%s.metadata"; // Filename: <containerID>.metadata
+ public static final String CONTAINER_METADATA_FILENAME_FORMAT =
"%s.metadata";
public static final String CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY =
"samza.log.dir";
// Auto-sizing related configs that take precedence over respective sizing
confings job.container.count, etc,
@@ -430,18 +432,30 @@ public class JobConfig extends MapConfig {
}
/**
- * The metadata file is written in a {@code exec-env-container-id}.metadata
file in the log-dir of the container.
- * Here the {@code exec-env-container-id} refers to the ID assigned by the
cluster manager (e.g., YARN) to the container,
- * which uniquely identifies a container's lifecycle.
+ * Get the {@link File} in the "samza.log.dir" for writing container
metadata.
+ * If {@link EnvironmentVariables#ENV_CONTAINER_METADATA_FILENAME} is
specified, then use that as the file name.
+ * Otherwise, the file name is {@code exec-env-container-id}.metadata. Here
the {@code exec-env-container-id} refers
+ * to the ID assigned by the cluster manager (e.g., YARN) to the container,
which uniquely identifies a container's
+ * lifecycle.
*/
public static Optional<File> getMetadataFile(String execEnvContainerId) {
String dir = System.getProperty(CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);
- if (dir == null || execEnvContainerId == null) {
- return Optional.empty();
- } else {
- return Optional.of(
- new File(dir, String.format(CONTAINER_METADATA_FILENAME_FORMAT,
execEnvContainerId)));
+ if (dir != null) {
+ String fileName =
System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME);
+ if (StringUtils.isNotBlank(fileName)) {
+ if (fileName.contains(File.separator)) {
+ throw new IllegalStateException(String.format("%s should not include
directories, but it is %s",
+ EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME, fileName));
+ } else {
+ return Optional.of(new File(dir, fileName));
+ }
+ } else {
+ if (execEnvContainerId != null) {
+ return Optional.of(new File(dir,
String.format(CONTAINER_METADATA_FILENAME_FORMAT, execEnvContainerId)));
+ }
+ }
}
+ return Optional.empty();
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
index 91977a206..a570e154c 100644
---
a/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
+++
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
@@ -36,4 +36,11 @@ public class EnvironmentVariables {
* environment variable.
*/
public static final String SAMZA_EPOCH_ID = "SAMZA_EPOCH_ID";
+
+ /**
+ * (Optional) File name to use for container metadata file. This should just
be a file name, and it should not include
+ * any directory structure.
+ * If this is not specified, then the framework will choose a file name.
+ */
+ public static final String ENV_CONTAINER_METADATA_FILENAME =
"CONTAINER_METADATA_FILENAME";
}
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 7ef29ac71..9521f29bd 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -80,6 +80,7 @@ public class DiagnosticsUtil {
MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new
Metrics());
MetadataFileContents metadataFileContents =
new MetadataFileContents("1", new String(new
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+ log.info("Writing metadata contents to {}",
metadataFile.get().getPath());
new FileUtil().writeToTextFile(metadataFile.get(), new String(new
JsonSerde<>().toBytes(metadataFileContents)), false);
} else {
log.info("Skipping writing metadata file.");
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index a037da41b..f56af2937 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -18,25 +18,35 @@
*/
package org.apache.samza.config;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.SamzaException;
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory;
import
org.apache.samza.container.grouper.stream.HashSystemStreamPartitionMapperFactory;
import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
import org.junit.Assert;
import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({System.class, JobConfig.class})
public class TestJobConfig {
@Test
public void testGetName() {
@@ -549,13 +559,41 @@ public class TestJobConfig {
public void testGetMetadataFile() {
String execEnvContainerId = "container-id";
String containerMetadataDirectory = "/tmp/samza/log/dir";
- System.setProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY,
containerMetadataDirectory);
- assertEquals(new File(containerMetadataDirectory,
- String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT,
execEnvContainerId)).getPath(),
- JobConfig.getMetadataFile(execEnvContainerId).get().getPath());
- System.clearProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY);
+ String containerMetadataFileNameFromEnv =
"container-metadata-file.metadata";
+
+ PowerMockito.mockStatic(System.class);
+
PowerMockito.when(System.getProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY)).thenReturn(null);
+
PowerMockito.when(System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME)).thenReturn(null);
+ // samza.log.dir not specified
+ assertEquals(Optional.empty(),
JobConfig.getMetadataFile(execEnvContainerId));
+
+ // provide value for samza.log.dir for remainder of tests
+
PowerMockito.when(System.getProperty(JobConfig.CONTAINER_METADATA_DIRECTORY_SYS_PROPERTY))
+ .thenReturn(containerMetadataDirectory);
+
+ // CONTAINER_METADATA_FILENAME not specified, execEnvContainerId specified
+ assertEquals(Optional.of(new File(containerMetadataDirectory,
+ String.format(JobConfig.CONTAINER_METADATA_FILENAME_FORMAT,
execEnvContainerId))),
+ JobConfig.getMetadataFile(execEnvContainerId));
+
+ // CONTAINER_METADATA_FILENAME not specified, execEnvContainerId not
specified
assertEquals(Optional.empty(), JobConfig.getMetadataFile(null));
+
+ // CONTAINER_METADATA_FILENAME specified
+
PowerMockito.when(System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME))
+ .thenReturn(containerMetadataFileNameFromEnv);
+ assertEquals(Optional.of(new File(containerMetadataDirectory,
containerMetadataFileNameFromEnv)),
+ JobConfig.getMetadataFile(execEnvContainerId));
+
+ // CONTAINER_METADATA_FILENAME invalid
+
PowerMockito.when(System.getenv(EnvironmentVariables.ENV_CONTAINER_METADATA_FILENAME))
+ .thenReturn("file/with/directories/file.txt");
+ try {
+ JobConfig.getMetadataFile(execEnvContainerId);
+ } catch (IllegalStateException e) {
+ // expected to throw exception for having directories in file name
+ }
}
@Test