robertwb commented on a change in pull request #15137:
URL: https://github.com/apache/beam/pull/15137#discussion_r672635315



##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -440,3 +441,25 @@ examples:
     shardId: "",
     key: "key"
   }
+
+---
+
+# Java code snippet to generate example bytes:
+#  TimestampPrefixingWindowCoder<IntervalWindow> coder = 
TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of());
+#  Instant end = new Instant(-9223372036854410L);
+#  Duration span = Duration.millis(365L);
+#  IntervalWindow window = new IntervalWindow(end.minus(span), span);
+#  byte[] bytes = CoderUtils.encodeToByteArray(coder, window);
+#  String str = new String(bytes, 
java.nio.charset.StandardCharsets.ISO_8859_1);
+#  String example = "";
+#  for(int i = 0; i < str.length(); i++){
+#    example += CharUtils.unicodeEscaped(str.charAt(i));
+#  }
+#  System.out.println(example);
+coder:
+  urn: "beam:coder:custom_window:v1"

Review comment:
       Should this be timestamp_prefixed (similar to length_prefixed)? 

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -440,3 +441,25 @@ examples:
     shardId: "",
     key: "key"
   }
+
+---
+
+# Java code snippet to generate example bytes:
+#  TimestampPrefixingWindowCoder<IntervalWindow> coder = 
TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of());
+#  Instant end = new Instant(-9223372036854410L);
+#  Duration span = Duration.millis(365L);
+#  IntervalWindow window = new IntervalWindow(end.minus(span), span);
+#  byte[] bytes = CoderUtils.encodeToByteArray(coder, window);
+#  String str = new String(bytes, 
java.nio.charset.StandardCharsets.ISO_8859_1);
+#  String example = "";
+#  for(int i = 0; i < str.length(); i++){
+#    example += CharUtils.unicodeEscaped(str.charAt(i));
+#  }
+#  System.out.println(example);
+coder:
+  urn: "beam:coder:custom_window:v1"
+  components: [{urn: "beam:coder:interval_window:v1"}]

Review comment:
       This shouldn't be limited to interval_windows, any windowed_coder should 
do.

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -939,6 +939,31 @@ message StandardCoders {
     // Experimental.
     STATE_BACKED_ITERABLE = 9 [(beam_urn) = 
"beam:coder:state_backed_iterable:v1"];
 
+
+    // Encodes an arbitrary user defined window and its max timestamp 
(inclusive).
+    // The encoding format is:
+    //   maxTimestamp window

Review comment:
       How is the window encoded? Should we length prefix it as well? Or do we 
expect TimestampPrefixed(LenghtPrefixed(CustomWindowCoder))? 

##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -1509,3 +1509,47 @@ def __repr__(self):
 
 Coder.register_structured_urn(
     common_urns.coders.SHARDED_KEY.urn, ShardedKeyCoder)
+
+
+class TimestampPrefixingWindowCoder(FastCoder):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Coder which prefixes the max timestamp of arbitrary window to its encoded
+  form."""
+  def __init__(self, window_coder: Coder) -> None:
+    self._window_coder = window_coder
+
+  def _create_impl(self):
+    return coder_impl.TimestampPrefixingWindowCoderImpl(
+        self._window_coder.get_impl())
+
+  def to_type_hint(self):
+    return self._window_coder.to_type_hint()
+
+  def _get_component_coders(self) -> List[Coder]:
+    return [self._window_coder]
+
+  def is_deterministic(self) -> bool:
+    return self._window_coder.is_deterministic()
+
+  def as_cloud_object(self, coders_context=None):

Review comment:
       Is this needed?

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -939,6 +939,31 @@ message StandardCoders {
     // Experimental.
     STATE_BACKED_ITERABLE = 9 [(beam_urn) = 
"beam:coder:state_backed_iterable:v1"];
 
+
+    // Encodes an arbitrary user defined window and its max timestamp 
(inclusive).
+    // The encoding format is:
+    //   maxTimestamp window
+    //
+    //   maxTimestamp - A big endian 8 byte integer representing 
millis-since-epoch.
+    //     The encoded representation is shifted so that the byte 
representation
+    //     of negative values are lexicographically ordered before the byte
+    //     representation of positive values. This is typically done by

Review comment:
       "typically done" makes it sound like any transformation preserving 
lexicographic ordering is acceptable. There is no choice here. 
   
   Perhaps we could reference other coders here (e.g. the encoding used for 
TimestmpCoder, or in WindowedValueCoder? 

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/coders/TimestampPrefixingWindowCoderTest.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.CoderProperties.TestElementByteSizeObserver;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+public class TimestampPrefixingWindowCoderTest {
+
+  private static class CustomWindow extends IntervalWindow {
+    private boolean isBig;
+
+    CustomWindow(Instant start, Instant end, boolean isBig) {
+      super(start, end);
+      this.isBig = isBig;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CustomWindow that = (CustomWindow) o;
+      return super.equals(o) && this.isBig == that.isBig;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), isBig);
+    }
+  }
+
+  private static class CustomWindowCoder extends CustomCoder<CustomWindow> {
+
+    private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = 
IntervalWindow.getCoder();
+    private static final int REGISTER_BYTE_SIZE = 1234;

Review comment:
       I'm not following what is meant by the `REGISTER_BYTE_SIZE` constant. 
`BYTE_SIZE_TO_REGISTER`?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
##########
@@ -21,6 +21,7 @@
 class CloudObjectKinds {
   static final String KIND_GLOBAL_WINDOW = "kind:global_window";
   static final String KIND_INTERVAL_WINDOW = "kind:interval_window";
+  static final String KIND_CUSTOM_WINDOW = "kind:custom_window";

Review comment:
       Why is this needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to