scwhittle commented on a change in pull request #13318:
URL: https://github.com/apache/beam/pull/13318#discussion_r526017646



##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -115,19 +109,23 @@ public Query3(NexmarkConfiguration configuration) {
                         "OR".equals(person.state)
                             || "ID".equals(person.state)
                             || "CA".equals(person.state)))
+            .apply(

Review comment:
       ditto

##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -197,85 +195,60 @@ private JoinDoFn(String name, int maxAuctionsWaitingTime) 
{
     @ProcessElement
     public void processElement(
         ProcessContext c,
-        @TimerId(PERSON_STATE_EXPIRING) Timer timer,
-        @StateId(PERSON) ValueState<Person> personState,
-        @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
+        @TimerId(STATE_EXPIRING) Timer timer,
+        @StateId(PERSON) @AlwaysFetched ValueState<Person> personState,
+        @StateId(AUCTIONS) BagState<Auction> auctionsState) {
       // We would *almost* implement this by  rewindowing into the global 
window and
       // running a combiner over the result. The combiner's accumulator would 
be the
       // state we use below. However, combiners cannot emit intermediate 
results, thus
-      // we need to wait for the pending ReduceFn API.
+      // we need to wait for the pending ReduceFn API
 
       Person existingPerson = personState.read();
-      if (existingPerson != null) {
-        // We've already seen the new person event for this person id.
-        // We can join with any new auctions on-the-fly without needing any
-        // additional persistent state.
-        for (Auction newAuction : 
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
-          newAuctionCounter.inc();
-          newOldOutputCounter.inc();
-          c.output(KV.of(newAuction, existingPerson));
-        }
-        return;
-      }
-
-      Person theNewPerson = null;
-      for (Person newPerson : 
c.element().getValue().getAll(NexmarkQueryUtil.PERSON_TAG)) {
-        if (theNewPerson == null) {
-          theNewPerson = newPerson;
+      if (c.element().getValue().newPerson != null) {

Review comment:
       done

##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -197,85 +195,60 @@ private JoinDoFn(String name, int maxAuctionsWaitingTime) 
{
     @ProcessElement
     public void processElement(
         ProcessContext c,
-        @TimerId(PERSON_STATE_EXPIRING) Timer timer,
-        @StateId(PERSON) ValueState<Person> personState,
-        @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
+        @TimerId(STATE_EXPIRING) Timer timer,
+        @StateId(PERSON) @AlwaysFetched ValueState<Person> personState,
+        @StateId(AUCTIONS) BagState<Auction> auctionsState) {
       // We would *almost* implement this by  rewindowing into the global 
window and
       // running a combiner over the result. The combiner's accumulator would 
be the
       // state we use below. However, combiners cannot emit intermediate 
results, thus
-      // we need to wait for the pending ReduceFn API.
+      // we need to wait for the pending ReduceFn API
 
       Person existingPerson = personState.read();
-      if (existingPerson != null) {
-        // We've already seen the new person event for this person id.
-        // We can join with any new auctions on-the-fly without needing any
-        // additional persistent state.
-        for (Auction newAuction : 
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
-          newAuctionCounter.inc();
-          newOldOutputCounter.inc();
-          c.output(KV.of(newAuction, existingPerson));
-        }
-        return;
-      }
-
-      Person theNewPerson = null;
-      for (Person newPerson : 
c.element().getValue().getAll(NexmarkQueryUtil.PERSON_TAG)) {
-        if (theNewPerson == null) {
-          theNewPerson = newPerson;
+      if (c.element().getValue().newPerson != null) {
+        Person person = c.element().getValue().newPerson;
+        if (existingPerson == null) {
+          newPersonCounter.inc();
+          personState.write(person);
         } else {
-          if (theNewPerson.equals(newPerson)) {
-            LOG.error("Duplicate person {}", theNewPerson);
+          if (person.equals(existingPerson)) {
+            LOG.error("Duplicate person {}", person);
           } else {
-            LOG.error("Conflicting persons {} and {}", theNewPerson, 
newPerson);
+            LOG.error("Conflicting persons {} and {}", existingPerson, person);
           }
           fatalCounter.inc();
-          continue;
         }
-        newPersonCounter.inc();
         // We've now seen the person for this person id so can flush any
         // pending auctions for the same seller id (an auction is done by only 
one seller).

Review comment:
       done

##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -83,27 +75,29 @@ public Query3(NexmarkConfiguration configuration) {
 
   @Override
   public PCollection<NameCityStateId> expand(PCollection<Event> events) {
-    int numEventsInPane = 30;
-
-    PCollection<Event> eventsWindowed =
-        events.apply(
-            Window.<Event>into(new GlobalWindows())
-                
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(numEventsInPane)))
-                .discardingFiredPanes()
-                .withAllowedLateness(Duration.ZERO));
-    PCollection<KV<Long, Auction>> auctionsBySellerId =
-        eventsWindowed
+    PCollection<KV<Long, Event>> auctionsBySellerId =
+        events
             // Only want the new auction events.
             .apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
 
             // We only want auctions in category 10.
             .apply(name + ".InCategory", Filter.by(auction -> auction.category 
== 10))
 
             // Key auctions by their seller id.
-            .apply("AuctionBySeller", NexmarkQueryUtil.AUCTION_BY_SELLER);
-
-    PCollection<KV<Long, Person>> personsById =
-        eventsWindowed
+            .apply(

Review comment:
       this doesn't work because we also need to change to the union Event type.
   
   Also Auction::seller has the error "Objects is not a functional interface". 
   
   So I kept as is, let me know if you'd rather WithKeys and then some 
auction->event with auction pardo. If you do, do you have suggestions on fixing 
the error?  My java is out of date so I'm not sure how to fix that off the top 
of my head.

##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -197,85 +195,60 @@ private JoinDoFn(String name, int maxAuctionsWaitingTime) 
{
     @ProcessElement
     public void processElement(
         ProcessContext c,

Review comment:
       done

##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
##########
@@ -53,6 +56,48 @@
 public class Query5 extends NexmarkQueryTransform<AuctionCount> {
   private final NexmarkConfiguration configuration;
 
+  public static class TopCombineFn
+      extends AccumulatingCombineFn<KV<Long, Long>, Accum, KV<Long, 
List<Long>>> {
+    @Override
+    public Accum createAccumulator() {
+      return new Accum();
+    }
+
+    @DefaultCoder(AvroCoder.class)

Review comment:
       done with tweaks

##########
File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -164,21 +165,19 @@ public void processElement(ProcessContext c) {
     @StateId(PERSON)
     private static final StateSpec<ValueState<Person>> personSpec = 
StateSpecs.value(Person.CODER);
 
-    private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
+    private static final String STATE_EXPIRING = "stateExpiring";
 
     @StateId(AUCTIONS)
-    private final StateSpec<ValueState<List<Auction>>> auctionsSpec =
-        StateSpecs.value(ListCoder.of(Auction.CODER));
+    private final StateSpec<BagState<Auction>> auctionsSpec = 
StateSpecs.bag(Auction.CODER);
 
-    @TimerId(PERSON_STATE_EXPIRING)
-    private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+    @TimerId(STATE_EXPIRING)

Review comment:
       Processing time is more equivalent to flink background GC, which is why 
I changed.  The previous code was never gc'ing auctions without persons, which 
seemed incorrect.  If we want to use an event time timer, it seems that we 
would need to keep some additional state to keep track of the highest event 
time of buffered auctions or people, otherwise we could reset timer 
inappropriately for late events. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to