Hi Samarth,

I had a quick discussion with Philipp who initially came up with the idea of 
the MergeBySchema processor.
The current solution is quite similar to already existing process elements.
So I would ask you to implement it as follows:
- the processing element requires to input streams
- inside the onInvocation method both SchemaInfos are compared and if they 
don't match (are the same) a SpRuntimeException is thrown. Here I would say 
that two SchemaInfos are equal if their EventSchemas contain the same 
EventProperties. Ideally, we would already check the SchemaInfos in the UI 
component, but unfortunately we don't have corresponding UI components yet.
- the onEvent method simply forwards each event without any further 
manipulations

Sorry for the confusion about the problem to be solved, I should have written a 
clearer description of the processor.
Best
Tim
On März 31 2021, at 6:41 am, Samarth Sah <[email protected]> wrote:
> Hi Tim/Team,
>
> Please let me know the parameters to be used in the overriding
> equals method for SchemaInfo.
>
> For emitting of multiple events can we try the below logic:
> for (Event e0 : this.streamBufferS0.getList()) {
> boolean atLeastOneMerge=false;
> for (Event e1 : this.streamBufferS1.getList()) {
> if (e0.getSchemaInfo().equals(e1.getSchemaInfo())) {
> atLeastOneMerge=true;
> Event resultingEvent = mergeEvents(e0, e1);
> spOutputCollector.collect(resultingEvent);
> this.streamBufferS1.removeOldEvent(e1);
> }
> }
> if(atLeastOneMerge)
> this.streamBufferS1.removeOldEvent(e0);
> }
>
>
> Thank You
> Samarth
>
>
> On Tue, Mar 30, 2021 at 8:18 PM Samarth Sah <[email protected]> wrote:
> > Hi Tim,
> >
> > Yes, the idea is to check for each incoming event if there are already one
> > or more events in the StreamBuffer that can be merged with the incoming one.
> >
> > However I am not sure about emitting the events in case of multiple
> > matches.Do you want me to stop checking after one match?Since that event is
> > already merged.
> >
> > I will try to override the equals methods for SchemaInfo.
> >
> > Thank You,
> > Samarth
> >
> >
> >
> >
> >
> > On Tue, Mar 30, 2021 at 7:38 PM udeho <[email protected]> wrote:
> >
> >> Hi Samarth,
> >>
> >> welcome to our mailing list and thank you for your contribution!
> >> I just had a look at your PR.
> >> Just to get it right, it is your idea to check for each incoming event if
> >> there are already one or more events in the StreamBuffer that can be merged
> >> with the incoming one? And if there are multiple matches, multiple events
> >> are emitted?
> >> I'm not quite sure, whether we should provide the buffering per default
> >> or only by choice because leads to big confusion if it's not expected. At
> >> least, we should limit buffering to when a new event arrives for the
> >> buffered stream, so that un-synchronized streams can still be merged.
> >>
> >> @all, how would you handle the stream buffering?
> >> In the declare model you have to specify the two required inputStreams.
> >> For example this could look like:
> >>
> >> .requiredStream(StreamRequirementsBuilder
> >> .create()
> >> .requiredProperty(EpRequirements.anyProperty())
> >> .build())
> >> .requiredStream(StreamRequirementsBuilder
> >> .create()
> >> .requiredProperty(EpRequirements.anyProperty())
> >> .build())
> >> Also, the EventSchema does not yet override the equals method and
> >> therefore the comparison does not work properly. If you want, you can try
> >> to implement that as well.
> >>
> >> Apart from this general things, your provided code looks very good.
> >> There are some minor things that we can address once we have agreed on
> >> this point, if that's okay with you?
> >>
> >> Best
> >> Tim
> >>
> >> On März 29 2021, at 2:57 pm, Samarth Sah <[email protected]> wrote:
> >> > Hi All,
> >> >
> >> > I am new to Streampipes.
> >> > I have raised the pull request:
> >> > https://github.com/apache/incubator-streampipes-extensions/pull/39,
> >> > regarding the issue
> >> https://issues.apache.org/jira/browse/STREAMPIPES-323.
> >> >
> >> > Please let me know if I am in the right direction.
> >> > Thank you,
> >> > Samarth
> >> >
> >> >
> >> >
> >> > On Mon, Mar 29, 2021 at 6:04 PM GitBox <[email protected]> wrote:
> >> > >
> >> > > Samarth08 opened a new pull request #39:
> >> > > URL:
> >> https://github.com/apache/incubator-streampipes-extensions/pull/39
> >> > >
> >> > >
> >> > > Fixes: https://issues.apache.org/jira/browse/STREAMPIPES-323
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > This is an automated message from the Apache Git Service.
> >> > > To respond to the message, please log on to GitHub and use the
> >> > > URL above to go to the specific comment.
> >> > >
> >> > > For queries about this service, please contact Infrastructure at:
> >> > > [email protected]
> >> > >
> >> > >
> >> > >
> >> >
> >>
> >>
>

Reply via email to