y1chi commented on a change in pull request #12427:
URL: https://github.com/apache/beam/pull/12427#discussion_r463285687



##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
##########
@@ -30,8 +30,17 @@
 from __future__ import absolute_import
 
 import apache_beam as beam
-from apache_beam.testing.benchmarks.nexmark.nexmark_util import ParseEventFn
 
 
-def load(raw_events, query_args=None):
-  return raw_events | 'ParseEventFn' >> beam.ParDo(ParseEventFn())  # pylint: 
disable=expression-not-assigned
+class round_tripFn(beam.DoFn):

Review comment:
       cls doesn't use snake case.

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):

Review comment:
       is this function necessary?

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):
+    return self.is_auction_window
+
+  def __str__(self):
+    return (
+        'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' 
%
+        (self.start, self.end, self.auction, self.is_auction_window))
+
+
+class AuctionOrBidWindowCoder(FastCoder):
+  def _create_impl(self):
+    return AuctionOrBidWindowCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+
+class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
+  _super_coder_impl = coder_impl.IntervalWindowCoderImpl()
+  _id_coder_impl = coder_impl.VarIntCoderImpl()
+  _bool_coder_impl = coder_impl.BooleanCoderImpl()
+
+  def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested):
+    self._super_coder_impl.encode_to_stream(value, stream, True)
+    self._id_coder_impl.encode_to_stream(value.auction, stream, True)
+    self._bool_coder_impl.encode_to_stream(
+        value.is_auction_window, stream, True)
+
+  def decode_from_stream(self, stream, nested):
+    super_window = self._super_coder_impl.decode_from_stream(stream, True)
+    auction = self._id_coder_impl.decode_from_stream(stream, True)
+    is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
+    return AuctionOrBidWindow(
+        super_window.start, super_window.end, auction, is_auction)
+
+
+class AuctionOrBidWindowFn(WindowFn):
+  def __init__(self, expected_duration_micro):
+    self.expected_duration = expected_duration_micro
+
+  def assign(self, assign_context):
+    event = assign_context.element
+    if isinstance(event, nexmark_model.Auction):
+      return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
+    elif isinstance(event, nexmark_model.Bid):
+      return [
+          AuctionOrBidWindow.for_bid(
+              self.expected_duration, assign_context.timestamp, event)
+      ]
+    else:
+      raise ValueError(
+          '%s can only assign windows to auctions and bids, but received %s' %
+          (self.__class__.__name__, event))
+
+  def merge(self, merge_context):
+    id_to_auction = {}

Review comment:
       nit: rename to auction_id_to_auction_window, auction_id_to_bid_window.

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/models/nexmark_model.py
##########
@@ -26,31 +26,55 @@
   - The bid on an item for auction (Bid).
 
 """
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.testing.benchmarks.nexmark import nexmark_util
+
+
+class PersonCoder(FastCoder):
+  def _create_impl(self):
+    return PersonCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
 
 
 class Person(object):
   "Author of an auction or a bid."
+  CODER = PersonCoder()
 
   def __init__(
-      self, id, name, email, credit_card, city, state, timestamp, extra=None):
+      self, id, name, email, credit_card, city, state, date_time, extra=None):
     self.id = id
     self.name = name
-    self.email = email  # key
+    self.email_address = email  # key
     self.credit_card = credit_card
     self.city = city
     self.state = state
-    self.timestamp = timestamp
+    self.date_time = date_time
     self.extra = extra
 
   def __repr__(self):
-    return 'Person({id}, {email})'.format(
-        **{
-            'id': self.id, 'email': self.email
-        })
+    return nexmark_util.model_to_json(self)
+
+
+class AuctionCoder(FastCoder):
+  def to_type_hint(self):

Review comment:
       why we need to override and pass here?

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):
+    return self.is_auction_window
+
+  def __str__(self):
+    return (
+        'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' 
%
+        (self.start, self.end, self.auction, self.is_auction_window))
+
+
+class AuctionOrBidWindowCoder(FastCoder):
+  def _create_impl(self):
+    return AuctionOrBidWindowCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+
+class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
+  _super_coder_impl = coder_impl.IntervalWindowCoderImpl()
+  _id_coder_impl = coder_impl.VarIntCoderImpl()
+  _bool_coder_impl = coder_impl.BooleanCoderImpl()
+
+  def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested):
+    self._super_coder_impl.encode_to_stream(value, stream, True)
+    self._id_coder_impl.encode_to_stream(value.auction, stream, True)
+    self._bool_coder_impl.encode_to_stream(
+        value.is_auction_window, stream, True)
+
+  def decode_from_stream(self, stream, nested):
+    super_window = self._super_coder_impl.decode_from_stream(stream, True)
+    auction = self._id_coder_impl.decode_from_stream(stream, True)
+    is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
+    return AuctionOrBidWindow(
+        super_window.start, super_window.end, auction, is_auction)
+
+
+class AuctionOrBidWindowFn(WindowFn):
+  def __init__(self, expected_duration_micro):
+    self.expected_duration = expected_duration_micro
+
+  def assign(self, assign_context):
+    event = assign_context.element
+    if isinstance(event, nexmark_model.Auction):
+      return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
+    elif isinstance(event, nexmark_model.Bid):
+      return [
+          AuctionOrBidWindow.for_bid(
+              self.expected_duration, assign_context.timestamp, event)
+      ]
+    else:
+      raise ValueError(
+          '%s can only assign windows to auctions and bids, but received %s' %
+          (self.__class__.__name__, event))
+
+  def merge(self, merge_context):
+    id_to_auction = {}
+    id_to_bid = {}
+    for window in merge_context.windows:
+      if window.is_auction_window_fn():
+        id_to_auction[window.auction] = window
+      else:
+        if window.auction in id_to_bid:
+          bid_windows = id_to_bid[window.auction]
+        else:
+          bid_windows = []
+          id_to_bid[window.auction] = bid_windows

Review comment:
       nit: you can do
   ```
   if window.auction not in id_to_bid:
     id_to_bid[window.auction] = []
   id_to_bid[window.auction].append(window)
   ```

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)
+
+  def is_auction_window_fn(self):
+    return self.is_auction_window
+
+  def __str__(self):
+    return (
+        'AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}' 
%
+        (self.start, self.end, self.auction, self.is_auction_window))
+
+
+class AuctionOrBidWindowCoder(FastCoder):
+  def _create_impl(self):
+    return AuctionOrBidWindowCoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+
+class AuctionOrBidWindowCoderImpl(coder_impl.StreamCoderImpl):
+  _super_coder_impl = coder_impl.IntervalWindowCoderImpl()
+  _id_coder_impl = coder_impl.VarIntCoderImpl()
+  _bool_coder_impl = coder_impl.BooleanCoderImpl()
+
+  def encode_to_stream(self, value: AuctionOrBidWindow, stream, nested):
+    self._super_coder_impl.encode_to_stream(value, stream, True)
+    self._id_coder_impl.encode_to_stream(value.auction, stream, True)
+    self._bool_coder_impl.encode_to_stream(
+        value.is_auction_window, stream, True)
+
+  def decode_from_stream(self, stream, nested):
+    super_window = self._super_coder_impl.decode_from_stream(stream, True)
+    auction = self._id_coder_impl.decode_from_stream(stream, True)
+    is_auction = self._bool_coder_impl.decode_from_stream(stream, True)
+    return AuctionOrBidWindow(
+        super_window.start, super_window.end, auction, is_auction)
+
+
+class AuctionOrBidWindowFn(WindowFn):
+  def __init__(self, expected_duration_micro):
+    self.expected_duration = expected_duration_micro
+
+  def assign(self, assign_context):
+    event = assign_context.element
+    if isinstance(event, nexmark_model.Auction):
+      return [AuctionOrBidWindow.for_auction(assign_context.timestamp, event)]
+    elif isinstance(event, nexmark_model.Bid):
+      return [
+          AuctionOrBidWindow.for_bid(
+              self.expected_duration, assign_context.timestamp, event)
+      ]
+    else:
+      raise ValueError(
+          '%s can only assign windows to auctions and bids, but received %s' %
+          (self.__class__.__name__, event))
+
+  def merge(self, merge_context):
+    id_to_auction = {}
+    id_to_bid = {}
+    for window in merge_context.windows:
+      if window.is_auction_window_fn():
+        id_to_auction[window.auction] = window
+      else:
+        if window.auction in id_to_bid:
+          bid_windows = id_to_bid[window.auction]
+        else:
+          bid_windows = []
+          id_to_bid[window.auction] = bid_windows
+        bid_windows.append(window)
+
+    for auction, auction_window in id_to_auction.items():
+      bid_window_list = id_to_bid.get(auction)
+      if bid_window_list is not None:
+        to_merge = []
+        for bid_window in bid_window_list:
+          if bid_window.start < auction_window.end:
+            to_merge.append(bid_window)
+        if len(to_merge) > 0:
+          to_merge.append(auction_window)
+          merge_context.merge(to_merge, auction_window)
+
+  def get_window_coder(self):
+    return AuctionOrBidWindowCoder()
+
+  def get_transformed_output_time(self, window, input_timestamp):
+    return window.max_timestamp()
+
+
+class JoinAuctionBidFn(beam.DoFn):
+  @staticmethod
+  def higher_bid(bid, other):
+    if bid.price > other.price:
+      return True
+    elif bid.price < other.price:
+      return False
+    else:
+      return bid.date_time < other.date_time
+
+  def process(self, element):
+    _, group = element
+    auctions = group[nexmark_query_util.AUCTION_TAG]
+    auction = auctions[0] if auctions else None
+    if auction is None:
+      return
+    best_bid = None
+    for bid in group[nexmark_query_util.BID_TAG]:
+      if bid.price < auction.reserve:
+        continue
+      if best_bid is None or JoinAuctionBidFn.higher_bid(bid, best_bid):
+        best_bid = bid
+    if best_bid is None:
+      return

Review comment:
       ```
   if best_bid:
     yield ...
   ```

##########
File path: sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
##########
@@ -103,6 +121,108 @@ def process(self, elem):
     yield event
 
 
+class ParseJsonEvnetFn(beam.DoFn):
+  """Parses the raw event info into a Python objects.
+
+  Each event line has the following format:
+
+    person: {id,name,email,credit_card,city, \
+                          state,timestamp,extra}
+    auction: {id,item_name, description,initial_bid, \
+                          
reserve_price,timestamp,expires,seller,category,extra}
+    bid: {auction,bidder,price,timestamp,extra}
+
+  For example:
+
+    {"id":1000,"name":"Peter Jones","emailAddress":"n...@xcat.com",\
+        "creditCard":"7241 7320 9143 4888","city":"Portland","state":"WY",\
+        "dateTime":1528098831026,\"extra":"WN_HS_bnpVQ\\[["}
+
+    {"id":1000,"itemName":"wkx mgee","description":"eszpqxtdxrvwmmywkmogoahf",\
+        "initialBid":28873,"reserve":29448,"dateTime":1528098831036,\
+        "expires":1528098840451,"seller":1000,"category":13,"extra":"zcuupiz"}
+
+    {"auction":1000,"bidder":1001,"price":32530001,"dateTime":1528098831066,\
+        "extra":"fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM"}
+  """
+  def process(self, elem):
+    json_dict = json.loads(elem)
+    if type(json_dict[FieldNames.DATE_TIME]) is dict:
+      json_dict[FieldNames.DATE_TIME] = json_dict[
+          FieldNames.DATE_TIME]['millis']
+    if FieldNames.NAME in json_dict:
+      yield nexmark_model.Person(
+          json_dict[FieldNames.ID],
+          json_dict[FieldNames.NAME],
+          json_dict[FieldNames.EMAIL_ADDRESS],
+          json_dict[FieldNames.CREDIT_CARD],
+          json_dict[FieldNames.CITY],
+          json_dict[FieldNames.STATE],
+          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
+          json_dict[FieldNames.EXTRA])
+    elif FieldNames.ITEM_NAME in json_dict:
+      if type(json_dict[FieldNames.EXPIRES]) is dict:
+        json_dict[FieldNames.EXPIRES] = json_dict[FieldNames.EXPIRES]['millis']
+      yield nexmark_model.Auction(
+          json_dict[FieldNames.ID],
+          json_dict[FieldNames.ITEM_NAME],
+          json_dict[FieldNames.DESCRIPTION],
+          json_dict[FieldNames.INITIAL_BID],
+          json_dict[FieldNames.RESERVE],
+          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
+          millis_to_timestamp(json_dict[FieldNames.EXPIRES]),
+          json_dict[FieldNames.SELLER],
+          json_dict[FieldNames.CATEGORY],
+          json_dict[FieldNames.EXTRA])
+    elif FieldNames.AUCTION in json_dict:
+      yield nexmark_model.Bid(
+          json_dict[FieldNames.AUCTION],
+          json_dict[FieldNames.BIDDER],
+          json_dict[FieldNames.PRICE],
+          millis_to_timestamp(json_dict[FieldNames.DATE_TIME]),
+          json_dict[FieldNames.EXTRA])
+    else:
+      raise ValueError('Invalid event: %s.' % str(json_dict))
+
+
+class CountAndLog(beam.PTransform):
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | 'window' >> beam.WindowInto(window.GlobalWindows())
+        | "Count" >> beam.combiners.Count.Globally()
+        | "Log" >> beam.Map(log_count_info))
+
+
+def log_count_info(count):
+  logging.info('Query resulted in %d results', count)
+  return count
+
+
 def display(elm):
   logging.debug(elm)
   return elm
+
+
+def model_to_json(model):
+  return json.dumps(construct_json_dict(model), separators=(',', ':'))
+
+
+def construct_json_dict(model):
+  return {k: unnest_to_json(v) for k, v in model.__dict__.items()}
+
+
+def unnest_to_json(cand):
+  if isinstance(cand, Timestamp):
+    return cand.micros // 1000
+  elif isinstance(
+      cand, (nexmark_model.Auction, nexmark_model.Bid, nexmark_model.Person)):
+    return construct_json_dict(cand)
+  else:
+    return cand
+
+
+def millis_to_timestamp(millis: int):
+  second = millis // 1000
+  micro_second = millis % 1000 * 1000
+  return Timestamp(second, micro_second)

Review comment:
       I think you can simply pass the micro converted from millis to Timestamp

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
##########
@@ -263,4 +279,4 @@ def run(self):
 if __name__ == '__main__':
   launcher = NexmarkLauncher()
   launcher.run()
-  launcher.cleanup()
+  # launcher.cleanup()

Review comment:
       I think this shouldn't be commented out.

##########
File path: 
sdks/python/apache_beam/testing/benchmarks/nexmark/queries/winning_bids.py
##########
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.coders import coder_impl
+from apache_beam.coders.coders import FastCoder
+from apache_beam.transforms.window import WindowFn
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
+from apache_beam.testing.benchmarks.nexmark.models import auction_bid
+from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
+
+
+class AuctionOrBidWindow(IntervalWindow):
+  """Windows for open auctions and bids."""
+  def __init__(self, start, end, auction_id, is_auction_window):
+    super(AuctionOrBidWindow, self).__init__(start, end)
+    self.auction = auction_id
+    self.is_auction_window = is_auction_window
+
+  @staticmethod
+  def for_auction(timestamp, auction: nexmark_model.Auction):
+    return AuctionOrBidWindow(timestamp, auction.expires, auction.id, True)
+
+  @staticmethod
+  def for_bid(expected_duration_micro, timestamp, bid: nexmark_model.Bid):
+    return AuctionOrBidWindow(
+        timestamp, timestamp + expected_duration_micro * 2, bid.auction, False)

Review comment:
       I think you need to be careful here, if timestamp is Timestamp type and 
expected_duration_micro is int, `+` will try to convert expected_duration_micro 
* 2 with Duration.of(expected_duration_micro * 2), which by default treat the 
expected_duration_micro * 2 as seconds value. You probably want to do something 
like timestamp + Duration(micros=expected_duration_micro * 2).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to