xinyuiscool commented on a change in pull request #1560:
URL: https://github.com/apache/samza/pull/1560#discussion_r785030967
##########
File path:
samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
##########
@@ -189,10 +190,19 @@ public void sink(SinkFunction<? super M> sinkFn) {
}
@Override
- public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table, Object
... args) {
+ public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
SendToTableOperatorSpec<K, V> op =
- OperatorSpecs.createSendToTableOperatorSpec(((TableImpl)
table).getTableId(), opId, args);
+ OperatorSpecs.createSendToTableOperatorSpec(((TableImpl)
table).getTableId(), opId);
+ this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
+ }
+
+ @Override
+ public <K, V, U> MessageStream<KV<K, UpdateMessage<U, V>>>
sendTo(Table<KV<K, V>> table, UpdateOptions contract) {
Review comment:
nit: update var name?
##########
File path:
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
##########
@@ -227,14 +228,28 @@ public static InputOperatorSpec createInputOperatorSpec(
*
* @param tableId the table Id for the underlying table
* @param opId the unique ID of the operator
- * @param args additional arguments passed to the table
* @param <K> the type of the table record key
* @param <V> the type of the table record value
* @return the {@link SendToTableOperatorSpec}
*/
public static <K, V> SendToTableOperatorSpec<K, V>
createSendToTableOperatorSpec(
- String tableId, String opId, Object ... args) {
- return new SendToTableOperatorSpec(tableId, opId, args);
+ String tableId, String opId) {
+ return new SendToTableOperatorSpec(tableId, opId);
+ }
+
+ /**
+ * Creates a {@link SendToTableWithUpdateOperatorSpec} with a key extractor
and a value extractor function.
+ *
+ * @param tableId the table Id for the underlying table
+ * @param opId the unique ID of the operator
+ * @param <K> the type of the table record key
+ * @param <V> the type of the table record value
+ * @param <U> the type of the table record value
+ * @return the {@link SendToTableOperatorSpec}
+ */
+ public static <K, V, U> SendToTableWithUpdateOperatorSpec<K, V, U>
createSendToTableWithUpdateOperatorSpec(
+ String tableId, String opId, UpdateOptions contract) {
Review comment:
nit: change var name to updateOptions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]