This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new bd9e9d8  SAMZA-1949: Add java docs and configuration documentation for 
side inputs (#1064)
bd9e9d8 is described below

commit bd9e9d8ca1d64f9545e92bf2b40e998df0fd5751
Author: mynameborat <[email protected]>
AuthorDate: Fri Nov 8 14:38:11 2019 -0800

    SAMZA-1949: Add java docs and configuration documentation for side inputs 
(#1064)
    
    Side inputs feature and configuration documentation
---
 .../documentation/versioned/api/low-level-api.md   | 25 ++++++++
 .../versioned/api/programming-model.md             |  3 +-
 .../learn/documentation/versioned/api/table-api.md | 66 +++++++++++++++++++++-
 .../versioned/jobs/samza-configurations.md         |  2 +
 .../table/descriptors/LocalTableDescriptor.java    | 17 ++++++
 5 files changed, 110 insertions(+), 3 deletions(-)

diff --git a/docs/learn/documentation/versioned/api/low-level-api.md 
b/docs/learn/documentation/versioned/api/low-level-api.md
index e3466af..6d345c1 100644
--- a/docs/learn/documentation/versioned/api/low-level-api.md
+++ b/docs/learn/documentation/versioned/api/low-level-api.md
@@ -274,6 +274,24 @@ For example:
 
 {% endhighlight %}
 
+### Side Inputs for Local Tables
+
+For populating a local [Table](javadocs/org/apache/samza/table/Table) with 
secondary data sources, we can use side inputs to specify the source stream. 
Additionally, the table descriptor also takes
+in a `SideInputsProcessor` that will be applied before writing the entries to 
the table. The `TableDescriptor` that is registered with the 
`TaskApplicationDescriptor` can be used to specify side input properties.
+
+The following code snippet shows a sample `TableDescriptor` for a local table 
that is backed by side inputs.
+
+{% highlight java %}
+
+    RocksDbTableDescriptor<String, Profile> tableDesc = 
+      new RocksDbTableDescriptor(“viewCounts”, KVSerde.of(new StringSerde(), 
new JsonSerdeV2<>(Profile.class)))
+        .withSideInput(ImmutableList.of("profile"))
+        .withSideInputsProcessor((message, store) -> {
+          ...
+        });
+
+{% endhighlight %} 
+
 ### Legacy Applications
 
 For legacy Low Level API applications, you can continue specifying your 
system, stream and store properties along with your task.class in 
configuration. An incomplete example of configuration for legacy task 
application looks like this (see the 
[configuration](../jobs/configuration.html) documentation for more detail):
@@ -296,4 +314,11 @@ For legacy Low Level API applications, you can continue 
specifying your system,
     # Use the "json" serializer for messages in the "PageViewEvent" topic
     systems.kafka.streams.PageViewEvent.samza.msg.serde=json
     
+    # Use "ProfileEvent" from the "kafka" system for side inputs for 
"profile-store"
+    stores.profile-store.side.inputs=kafka.ProfileEvent
+    
+    # Use "MySideInputsProcessorFactory" to instantiate the 
"SideInputsProcessor" 
+    # that will applied on the "ProfileEvent" before writing to "profile-store"
+    
stores.profile-store.side.inputs.processor.factory=org.apache.samza.MySideInputsProcessorFactory
+    
 {% endhighlight %}
\ No newline at end of file
diff --git a/docs/learn/documentation/versioned/api/programming-model.md 
b/docs/learn/documentation/versioned/api/programming-model.md
index 943c573..8c93011 100644
--- a/docs/learn/documentation/versioned/api/programming-model.md
+++ b/docs/learn/documentation/versioned/api/programming-model.md
@@ -71,8 +71,7 @@ Descriptors let you specify the properties of various aspects 
of your applicatio
 
[InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html)s
 and 
[OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html)s
 can be used for specifying Samza and implementation-specific properties of the 
streaming inputs and outputs for your application. You can obtain 
InputDescriptors and OutputDescriptors using a 
[SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor.html)
 for your system. This SystemDescrip [...]
 can use the 
[GenericSystemDescriptor](javadocs/org/apache/samza/system/descriptors/GenericSystemDescriptor.html).
 
-A 
[TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html)
 can be used for specifying Samza and implementation-specific properties of a 
[Table](javadocs/org/apache/samza/table/Table.html). You can use a Local 
TableDescriptor (e.g. 
[RocksDbTableDescriptor](javadocs/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.html)
 or a 
[RemoteTableDescriptor](javadocs/org/apache/samza/table/descriptors/RemoteTableDescriptor).
-
+A 
[TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html)
 can be used for specifying Samza and implementation-specific properties of a 
[Table](javadocs/org/apache/samza/table/Table.html). You can use a Local 
TableDescriptor (e.g. 
[RocksDbTableDescriptor](javadocs/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.html)
 or a 
[RemoteTableDescriptor](javadocs/org/apache/samza/table/descriptors/RemoteTableDescriptor).
    
 
 The following example illustrates how you can use input and output descriptors 
for a Kafka system, and a table descriptor for a local RocksDB table within 
your application:
 
diff --git a/docs/learn/documentation/versioned/api/table-api.md 
b/docs/learn/documentation/versioned/api/table-api.md
index 106d27a..3511233 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -180,6 +180,66 @@ join with a table and finally write the output to another 
table.
    function defined in lines 30-39.
 6. Line 12: writes the join result stream to another table
 
+# Using Table with Samza High Level API using Side Inputs
+
+The code snippet below illustrates the usage of table in Samza high level API 
using side inputs.
+
+{% highlight java %}
+
+ 1  class SamzaStreamApplication implements StreamApplication {
+ 2    @Override
+ 3    public void describe(StreamApplicationDescriptor appDesc) {
+ 4      TableDescriptor<Integer, Profile> desc = new InMemoryTableDescriptor(
+ 5          "t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+ 6          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
+ 7          .withSideInputsProcessor((msg, store) -> {
+ 8              Profile profile = (Profile) msg.getMessage();
+ 9              int key = profile.getMemberId();
+10              return ImmutableList.of(new Entry<>(key, profile));
+11            });
+12 
+13      Table<KV<Integer, Profile>> table = appDesc.getTable(desc);
+14 
+15      appDesc.getInputStream("PageView", new NoOpSerde<PageView>())
+16          .map(new MyMapFunc())
+17          .join(table, new MyJoinFunc())
+18          .sendTo(anotherTable);
+19    }
+21  }
+22
+23  static class MyMapFunc implements MapFunction<PageView, KV<Integer, 
PageView>> {
+24    private ReadableTable<Integer, Profile> profileTable;
+25
+26    @Override
+27    public void init(Config config, TaskContext context) {
+28      profileTable = (ReadableTable<Integer, Profile>) 
context.getTable("t1");
+29    }
+30 
+31    @Override
+32    public KV<Integer, PageView> apply(PageView message) {
+33      return new KV.of(message.getId(), message);
+34    }
+35  }
+36
+37  static class MyJoinFunc implements StreamTableJoinFunction
+38      <Integer, KV<Integer, PageView>, KV<Integer, Profile>, 
EnrichedPageView> {
+39
+40    @Override
+41    public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, 
Profile> r) {
+42      counterPerJoinFn.get(this.currentSeqNo).incrementAndGet();
+43        return r == null ? null : new EnrichedPageView(
+44            m.getValue().getPageKey(), m.getKey(), 
r.getValue().getCompany());
+45    }
+46  }
+
+{% endhighlight %}
+
+The code above uses side inputs to populate the profile table. 
+1. Line 6: Denotes the source stream for the profile table
+2. Line 7-11: Provides an implementation of `SideInputsProcessor` that reads 
from profile stream
+     and populates the table.
+3. Line 17: Incoming page views are joined against the profile table.
+
 # Using Table with Samza Low Level API
 
 The code snippet below illustrates the usage of table in Samza Low Level Task 
API.
@@ -443,7 +503,11 @@ on the current implementation of in-memory and RocksDB 
stores. Both tables provi
 feature parity to existing in-memory and RocksDB-based stores. For more 
detailed 
 information please refer to 
 [`RocksDbTableDescriptor`] 
(https://github.com/apache/samza/blob/master/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java)
 and 
-[`InMemoryTableDescriptor`] 
(https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java).
 
+[`InMemoryTableDescriptor`] 
(https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java).
+
+For local tables that are populated by secondary data sources, side inputs can 
be used to populate the data.
+The source streams will be used to bootstrap the data instead of a changelog 
in the event of failure. Side inputs and 
+the processor implementation can be provided as properties to the 
`TableDescriptor`.
 
 ## Hybrid Table
 
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 8637a4a..1a2e04b 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -280,6 +280,8 @@ These properties define Samza's storage mechanism for 
efficient [stateful stream
 |stores.**_store-name_**.<br>rocksdb.metrics.list|(none)|A list of [RocksDB 
properties](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409)
 to expose as metrics (gauges).|
 
|stores.**_store-name_**.<br>rocksdb.delete.obsolete.files.period.micros|21600000000|This
 property specifies the period in microseconds to delete obsolete files 
regardless of files removed during compaction. Allowed range is up to 
9223372036854775807.|
 
|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|18446744073709551615|This
 property specifies the maximum size of the MANIFEST data file, after which it 
is rotated. Default value is also the maximum, making it practically unlimited: 
only one manifest file is used.|
+|stores.**_store-name_**.<br>side.inputs|(none)|Samza applications with stores 
that are populated by a secondary data sources such as HDFS, but otherwise 
ready-only, can leverage side inputs. Stores configured with side inputs use 
the the source streams to bootstrap data in the absence of local copy thereby, 
reducing additional copy of the data in changelog. It is also recommended to 
enable host affinity feature when turning on side inputs to prevent 
bootstrapping of the data during cont [...]
+|stores.**_store-name_**.<br>side.inputs.processor.factory|(none)|The value is 
a fully-qualified name of a Java class that implements <a 
href="../api/javadocs/org/apache/samza/storage/SideInputProcessorFactory.html">SideInputProcessorFactory</a>.
 It is a required configuration for stores with side inputs 
(`stores.store-name.side.inputs`).
 
 ### <a name="deployment"></a>[5. Deployment](#deployment)
 Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) 
[deployment models](../deployment/deployment-model.html). Below are the 
configurations options for both models.
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index 1623710..8fff2b5 100644
--- 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -67,6 +67,16 @@ abstract public class LocalTableDescriptor<K, V, D extends 
LocalTableDescriptor<
     this.serde = serde;
   }
 
+  /**
+   * Add side inputs to the table. Each stream is of the format
+   * <i>system-name</i>.<i>stream-name</i>. The streams are marked as 
bootstrap streams and once the table is bootstrapped, it is
+   * updated in the background in change capture mode.
+   * Applications should specify the transformation logic using {@link 
#withSideInputsProcessor(SideInputsProcessor)}, which is
+   * will be applied to the incoming messages and the results are written to 
the table.
+   *
+   * @param sideInputs list of side input streams
+   * @return this table descriptor instance
+   */
   public D withSideInputs(List<String> sideInputs) {
     this.sideInputs = sideInputs;
     // Disable changelog
@@ -76,6 +86,13 @@ abstract public class LocalTableDescriptor<K, V, D extends 
LocalTableDescriptor<
     return (D) this;
   }
 
+  /**
+   * Provide the {@link SideInputsProcessor} for this table. It is applied on 
the side inputs and the results are
+   * written to the table.
+   *
+   * @param sideInputsProcessor a side input processor
+   * @return this table descriptor instance
+   */
   public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
     this.sideInputsProcessor = sideInputsProcessor;
     return (D) this;

Reply via email to