This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.5.0 by this push:
new abc7c3d SAMZA-2538: Update build.gradle from using config-path to
job.config.loader.properties.path
abc7c3d is described below
commit abc7c3d1ea712c69d7fac2f3c216043d6daef5db
Author: Ke Wu <[email protected]>
AuthorDate: Tue Jun 2 18:51:19 2020 -0700
SAMZA-2538: Update build.gradle from using config-path to
job.config.loader.properties.path
Symptom: Gradle tasks in samza-shell (checkpointTool, kvPerformanceTest,
runJob) does not work.
Cause: Gradle tasks are not being updated to be compatible with Samza 1.5.
Changes: Translate configPath property to job.config.loader.factory and
job.config.loader.properties.path instead of config-path.
Tests:
1. ./gradlew samza-shell:runJob
-PconfigPath=/deploy/samza/config/wikipedia-feed.properties
2. ./gradlew samza-shell:kvPerformanceTest
-PconfigPath=$PWD/samza-test/src/main/config/perf/kv-perf.properties
API Changes: None
Upgrade Instructions: None
Usage Instructions: None
Author: Ke Wu <[email protected]>
Reviewers: mynameborat <[email protected]>
Closes #1371 from kw2542/SAMZA-2538
(cherry picked from commit b3d5dc3937830898d9213acf9156519fb574492c)
Signed-off-by: mynameborat <[email protected]>
---
README.md | 6 +++---
build.gradle | 18 ++++++++++++------
.../documentation/versioned/container/checkpointing.md | 6 +++---
.../versioned/container/coordinator-stream.md | 2 +-
.../versioned/container/state-management.md | 4 ++--
gradle/wrapper/gradle-wrapper.properties | 1 +
.../org/apache/samza/checkpoint/CheckpointTool.scala | 9 ++++-----
7 files changed, 26 insertions(+), 20 deletions(-)
diff --git a/README.md b/README.md
index ec1cd94..e5c91a4 100644
--- a/README.md
+++ b/README.md
@@ -62,15 +62,15 @@ To run standalone integration tests:
To run a job (defined in a properties file):
- ./gradlew samza-shell:runJob
-PconfigPath=file:///path/to/job/config.properties
+ ./gradlew samza-shell:runJob -PconfigPath=/path/to/job/config.properties
To inspect a job's latest checkpoint:
- ./gradlew samza-shell:checkpointTool
-PconfigPath=file:///path/to/job/config.properties
+ ./gradlew samza-shell:checkpointTool
-PconfigPath=/path/to/job/config.properties
To modify a job's checkpoint (assumes that the job is not currently running),
give it a file with the new offset for each partition, in the format
`systems.<system>.streams.<topic>.partitions.<partition>=<offset>`:
- ./gradlew samza-shell:checkpointTool
-PconfigPath=file:///path/to/job/config.properties \
+ ./gradlew samza-shell:checkpointTool
-PconfigPath=/path/to/job/config.properties \
-PnewOffsets=file:///path/to/new/offsets.properties
### Developers
diff --git a/build.gradle b/build.gradle
index 243fce8..41ff784 100644
--- a/build.gradle
+++ b/build.gradle
@@ -575,33 +575,39 @@ project(":samza-shell") {
}
// Usage: ./gradlew samza-shell:runJob \
- // -PconfigPath=file:///path/to/job/config.properties
+ // -PconfigPath=/path/to/job/config.properties
task runJob(type:JavaExec) {
description 'To run a job (defined in a properties file)'
main = 'org.apache.samza.job.JobRunner'
classpath = configurations.gradleShell
- if (project.hasProperty('configPath')) args += ['--config-path',
configPath]
+ if (project.hasProperty('configPath')) args += [
+ '--config',
'job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory',
+ '--config', 'job.config.loader.properties.path=' + configPath]
jvmArgs =
["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
}
// Usage: ./gradlew samza-shell:checkpointTool \
- // -PconfigPath=file:///path/to/job/config.properties
-PnewOffsets=file:///path/to/new/offsets.properties
+ // -PconfigPath=/path/to/job/config.properties
-PnewOffsets=/path/to/new/offsets.properties
task checkpointTool(type:JavaExec) {
description 'Command-line tool to inspect and manipulate the job’s
checkpoint'
main = 'org.apache.samza.checkpoint.CheckpointTool'
classpath = configurations.gradleShell
- if (project.hasProperty('configPath')) args += ['--config-path',
configPath]
+ if (project.hasProperty('configPath')) args += [
+ '--config',
'job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory',
+ '--config', 'job.config.loader.properties.path=' + configPath]
if (project.hasProperty('newOffsets')) args += ['--new-offsets',
newOffsets]
jvmArgs =
["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
}
// Usage: ./gradlew samza-shell:kvPerformanceTest
- // -PconfigPath=file:///path/to/job/config.properties
+ // -PconfigPath=/path/to/job/config.properties
task kvPerformanceTest(type:JavaExec) {
description 'Command-line tool to run key-value performance tests'
main = 'org.apache.samza.test.performance.TestKeyValuePerformance'
classpath = configurations.gradleShell
- if (project.hasProperty('configPath')) args += ['--config-path',
configPath]
+ if (project.hasProperty('configPath')) args += [
+ '--config',
'job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory',
+ '--config', 'job.config.loader.properties.path=' + configPath]
jvmArgs =
["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
}
}
diff --git a/docs/learn/documentation/versioned/container/checkpointing.md
b/docs/learn/documentation/versioned/container/checkpointing.md
index 9fb7e6d..a829b1f 100644
--- a/docs/learn/documentation/versioned/container/checkpointing.md
+++ b/docs/learn/documentation/versioned/container/checkpointing.md
@@ -108,15 +108,15 @@ To inspect a job's latest checkpoint, you need to specify
your job's config file
{% highlight bash %}
samza-example/target/bin/checkpoint-tool.sh \
- --config-path=file:///path/to/job/config.properties
+ --config-path=/path/to/job/config.properties
{% endhighlight %}
This command prints out the latest checkpoint in a properties file format. You
can save the output to a file, and edit it as you wish. For example, to jump
back to the oldest possible point in time, you can set all the offsets to 0.
Then you can feed that properties file back into checkpoint-tool.sh and save
the modified checkpoint:
{% highlight bash %}
samza-example/target/bin/checkpoint-tool.sh \
- --config-path=file:///path/to/job/config.properties \
- --new-offsets=file:///path/to/new/offsets.properties
+ --config-path=/path/to/job/config.properties \
+ --new-offsets=/path/to/new/offsets.properties
{% endhighlight %}
Note that Samza only reads checkpoints on container startup. In order for your
checkpoint change to take effect, you need to first stop the job, then save the
modified offsets, and then start the job again. If you write a checkpoint while
the job is running, it will most likely have no effect.
diff --git a/docs/learn/documentation/versioned/container/coordinator-stream.md
b/docs/learn/documentation/versioned/container/coordinator-stream.md
index 6b14960..8d528d6 100644
--- a/docs/learn/documentation/versioned/container/coordinator-stream.md
+++ b/docs/learn/documentation/versioned/container/coordinator-stream.md
@@ -114,7 +114,7 @@ The coordinator stream messages that are currently
supported are listed below:
Samza provides a command line tool to write Job Configuration messages to the
coordinator stream. The tool can be used as follows:
{% highlight bash %}
samza-example/target/bin/run-coordinator-stream-writer.sh \
- --config-path=file:///path/to/job/config.properties \
+ --config-path=/path/to/job/config.properties \
--type set-config \
--key job.container.count \
--value 8
diff --git a/docs/learn/documentation/versioned/container/state-management.md
b/docs/learn/documentation/versioned/container/state-management.md
index 86b0d44..4371541 100644
--- a/docs/learn/documentation/versioned/container/state-management.md
+++ b/docs/learn/documentation/versioned/container/state-management.md
@@ -192,7 +192,7 @@ Currently Samza provides a state storage tool which can
recover the state store
{% highlight bash %}
samza-example/target/bin/state-storage-tool.sh \
- --config-path=file:///path/to/job/config.properties \
+ --config-path=/path/to/job/config.properties \
--path=directory/to/put/state/stores
{% endhighlight %}
@@ -202,7 +202,7 @@ Samza also provides a tool to read the value from a running
job's RocksDB.
{% highlight bash %}
samza-example/target/bin/read-rocksdb-tool.sh \
- --config-path=file:///path/to/job/config.properties \
+ --config-path=/path/to/job/config.properties \
--db-path=/tmp/nm-local-dir/state/test-state/Partition_0 \
--db-name=test-state \
--string-key=a,b,c
diff --git a/gradle/wrapper/gradle-wrapper.properties
b/gradle/wrapper/gradle-wrapper.properties
index 44e7c4d..83639a3 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,3 +1,4 @@
+#Mon Jun 01 15:50:38 PDT 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
diff --git
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 4db807e..7936c0a 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -20,7 +20,6 @@
package org.apache.samza.checkpoint
import java.io.FileInputStream
-import java.net.URI
import java.util
import java.util.Properties
import java.util.regex.Pattern
@@ -82,12 +81,12 @@ object CheckpointTool {
type TaskNameToCheckpointMap = Map[TaskName, Map[SystemStreamPartition,
String]]
class CheckpointToolCommandLine extends CommandLine with Logging {
- val newOffsetsOpt: ArgumentAcceptingOptionSpec[URI] =
- parser.accepts("new-offsets", "URI of file (e.g.
file:///some/local/path.properties) " +
+ val newOffsetsOpt: ArgumentAcceptingOptionSpec[String] =
+ parser.accepts("new-offsets", "Location of file (e.g.
/some/local/path.properties) " +
"containing offsets to write to the job's
checkpoint topic. " +
"If not given, this tool prints out the
current offsets.")
.withRequiredArg
- .ofType(classOf[URI])
+ .ofType(classOf[String])
.describedAs("path")
var newOffsets: TaskNameToCheckpointMap = _
@@ -121,7 +120,7 @@ object CheckpointTool {
override def loadConfig(options: OptionSet): Config = {
val config = super.loadConfig(options)
if (options.has(newOffsetsOpt)) {
- val newOffsetsInputStream = new
FileInputStream(options.valueOf(newOffsetsOpt).getPath)
+ val newOffsetsInputStream = new
FileInputStream(options.valueOf(newOffsetsOpt))
val properties = new Properties()
properties.load(newOffsetsInputStream)