Hi Tim,

I have raised a PR for overriding the equals method for schema Info.

https://github.com/apache/incubator-streampipes/pull/34

Please Check.

Thank You
Samarth

On Thu, Apr 1, 2021 at 6:28 PM udeho <[email protected]> wrote:

> Hi Samarth,
>
> I had a quick discussion with Philipp who initially came up with the idea
> of the MergeBySchema processor.
> The current solution is quite similar to already existing process elements.
> So I would ask you to implement it as follows:
> - the processing element requires to input streams
> - inside the onInvocation method both SchemaInfos are compared and if they
> don't match (are the same) a SpRuntimeException is thrown. Here I would say
> that two SchemaInfos are equal if their EventSchemas contain the same
> EventProperties. Ideally, we would already check the SchemaInfos in the UI
> component, but unfortunately we don't have corresponding UI components yet.
> - the onEvent method simply forwards each event without any further
> manipulations
>
> Sorry for the confusion about the problem to be solved, I should have
> written a clearer description of the processor.
> Best
> Tim
> On März 31 2021, at 6:41 am, Samarth Sah <[email protected]> wrote:
> > Hi Tim/Team,
> >
> > Please let me know the parameters to be used in the overriding
> > equals method for SchemaInfo.
> >
> > For emitting of multiple events can we try the below logic:
> > for (Event e0 : this.streamBufferS0.getList()) {
> > boolean atLeastOneMerge=false;
> > for (Event e1 : this.streamBufferS1.getList()) {
> > if (e0.getSchemaInfo().equals(e1.getSchemaInfo())) {
> > atLeastOneMerge=true;
> > Event resultingEvent = mergeEvents(e0, e1);
> > spOutputCollector.collect(resultingEvent);
> > this.streamBufferS1.removeOldEvent(e1);
> > }
> > }
> > if(atLeastOneMerge)
> > this.streamBufferS1.removeOldEvent(e0);
> > }
> >
> >
> > Thank You
> > Samarth
> >
> >
> > On Tue, Mar 30, 2021 at 8:18 PM Samarth Sah <[email protected]> wrote:
> > > Hi Tim,
> > >
> > > Yes, the idea is to check for each incoming event if there are already
> one
> > > or more events in the StreamBuffer that can be merged with the
> incoming one.
> > >
> > > However I am not sure about emitting the events in case of multiple
> > > matches.Do you want me to stop checking after one match?Since that
> event is
> > > already merged.
> > >
> > > I will try to override the equals methods for SchemaInfo.
> > >
> > > Thank You,
> > > Samarth
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Mar 30, 2021 at 7:38 PM udeho <[email protected]> wrote:
> > >
> > >> Hi Samarth,
> > >>
> > >> welcome to our mailing list and thank you for your contribution!
> > >> I just had a look at your PR.
> > >> Just to get it right, it is your idea to check for each incoming
> event if
> > >> there are already one or more events in the StreamBuffer that can be
> merged
> > >> with the incoming one? And if there are multiple matches, multiple
> events
> > >> are emitted?
> > >> I'm not quite sure, whether we should provide the buffering per
> default
> > >> or only by choice because leads to big confusion if it's not
> expected. At
> > >> least, we should limit buffering to when a new event arrives for the
> > >> buffered stream, so that un-synchronized streams can still be merged.
> > >>
> > >> @all, how would you handle the stream buffering?
> > >> In the declare model you have to specify the two required
> inputStreams.
> > >> For example this could look like:
> > >>
> > >> .requiredStream(StreamRequirementsBuilder
> > >> .create()
> > >> .requiredProperty(EpRequirements.anyProperty())
> > >> .build())
> > >> .requiredStream(StreamRequirementsBuilder
> > >> .create()
> > >> .requiredProperty(EpRequirements.anyProperty())
> > >> .build())
> > >> Also, the EventSchema does not yet override the equals method and
> > >> therefore the comparison does not work properly. If you want, you can
> try
> > >> to implement that as well.
> > >>
> > >> Apart from this general things, your provided code looks very good.
> > >> There are some minor things that we can address once we have agreed on
> > >> this point, if that's okay with you?
> > >>
> > >> Best
> > >> Tim
> > >>
> > >> On März 29 2021, at 2:57 pm, Samarth Sah <[email protected]> wrote:
> > >> > Hi All,
> > >> >
> > >> > I am new to Streampipes.
> > >> > I have raised the pull request:
> > >> > https://github.com/apache/incubator-streampipes-extensions/pull/39,
> > >> > regarding the issue
> > >> https://issues.apache.org/jira/browse/STREAMPIPES-323.
> > >> >
> > >> > Please let me know if I am in the right direction.
> > >> > Thank you,
> > >> > Samarth
> > >> >
> > >> >
> > >> >
> > >> > On Mon, Mar 29, 2021 at 6:04 PM GitBox <[email protected]> wrote:
> > >> > >
> > >> > > Samarth08 opened a new pull request #39:
> > >> > > URL:
> > >> https://github.com/apache/incubator-streampipes-extensions/pull/39
> > >> > >
> > >> > >
> > >> > > Fixes: https://issues.apache.org/jira/browse/STREAMPIPES-323
> > >> > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > This is an automated message from the Apache Git Service.
> > >> > > To respond to the message, please log on to GitHub and use the
> > >> > > URL above to go to the specific comment.
> > >> > >
> > >> > > For queries about this service, please contact Infrastructure at:
> > >> > > [email protected]
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> >
>
>

Reply via email to