Hi Tim, I have raised a PR for overriding the equals method for schema Info.
https://github.com/apache/incubator-streampipes/pull/34 Please Check. Thank You Samarth On Thu, Apr 1, 2021 at 6:28 PM udeho <[email protected]> wrote: > Hi Samarth, > > I had a quick discussion with Philipp who initially came up with the idea > of the MergeBySchema processor. > The current solution is quite similar to already existing process elements. > So I would ask you to implement it as follows: > - the processing element requires to input streams > - inside the onInvocation method both SchemaInfos are compared and if they > don't match (are the same) a SpRuntimeException is thrown. Here I would say > that two SchemaInfos are equal if their EventSchemas contain the same > EventProperties. Ideally, we would already check the SchemaInfos in the UI > component, but unfortunately we don't have corresponding UI components yet. > - the onEvent method simply forwards each event without any further > manipulations > > Sorry for the confusion about the problem to be solved, I should have > written a clearer description of the processor. > Best > Tim > On März 31 2021, at 6:41 am, Samarth Sah <[email protected]> wrote: > > Hi Tim/Team, > > > > Please let me know the parameters to be used in the overriding > > equals method for SchemaInfo. > > > > For emitting of multiple events can we try the below logic: > > for (Event e0 : this.streamBufferS0.getList()) { > > boolean atLeastOneMerge=false; > > for (Event e1 : this.streamBufferS1.getList()) { > > if (e0.getSchemaInfo().equals(e1.getSchemaInfo())) { > > atLeastOneMerge=true; > > Event resultingEvent = mergeEvents(e0, e1); > > spOutputCollector.collect(resultingEvent); > > this.streamBufferS1.removeOldEvent(e1); > > } > > } > > if(atLeastOneMerge) > > this.streamBufferS1.removeOldEvent(e0); > > } > > > > > > Thank You > > Samarth > > > > > > On Tue, Mar 30, 2021 at 8:18 PM Samarth Sah <[email protected]> wrote: > > > Hi Tim, > > > > > > Yes, the idea is to check for each incoming event if there are already > one > > > or more events in the StreamBuffer that can be merged with the > incoming one. > > > > > > However I am not sure about emitting the events in case of multiple > > > matches.Do you want me to stop checking after one match?Since that > event is > > > already merged. > > > > > > I will try to override the equals methods for SchemaInfo. > > > > > > Thank You, > > > Samarth > > > > > > > > > > > > > > > > > > On Tue, Mar 30, 2021 at 7:38 PM udeho <[email protected]> wrote: > > > > > >> Hi Samarth, > > >> > > >> welcome to our mailing list and thank you for your contribution! > > >> I just had a look at your PR. > > >> Just to get it right, it is your idea to check for each incoming > event if > > >> there are already one or more events in the StreamBuffer that can be > merged > > >> with the incoming one? And if there are multiple matches, multiple > events > > >> are emitted? > > >> I'm not quite sure, whether we should provide the buffering per > default > > >> or only by choice because leads to big confusion if it's not > expected. At > > >> least, we should limit buffering to when a new event arrives for the > > >> buffered stream, so that un-synchronized streams can still be merged. > > >> > > >> @all, how would you handle the stream buffering? > > >> In the declare model you have to specify the two required > inputStreams. > > >> For example this could look like: > > >> > > >> .requiredStream(StreamRequirementsBuilder > > >> .create() > > >> .requiredProperty(EpRequirements.anyProperty()) > > >> .build()) > > >> .requiredStream(StreamRequirementsBuilder > > >> .create() > > >> .requiredProperty(EpRequirements.anyProperty()) > > >> .build()) > > >> Also, the EventSchema does not yet override the equals method and > > >> therefore the comparison does not work properly. If you want, you can > try > > >> to implement that as well. > > >> > > >> Apart from this general things, your provided code looks very good. > > >> There are some minor things that we can address once we have agreed on > > >> this point, if that's okay with you? > > >> > > >> Best > > >> Tim > > >> > > >> On März 29 2021, at 2:57 pm, Samarth Sah <[email protected]> wrote: > > >> > Hi All, > > >> > > > >> > I am new to Streampipes. > > >> > I have raised the pull request: > > >> > https://github.com/apache/incubator-streampipes-extensions/pull/39, > > >> > regarding the issue > > >> https://issues.apache.org/jira/browse/STREAMPIPES-323. > > >> > > > >> > Please let me know if I am in the right direction. > > >> > Thank you, > > >> > Samarth > > >> > > > >> > > > >> > > > >> > On Mon, Mar 29, 2021 at 6:04 PM GitBox <[email protected]> wrote: > > >> > > > > >> > > Samarth08 opened a new pull request #39: > > >> > > URL: > > >> https://github.com/apache/incubator-streampipes-extensions/pull/39 > > >> > > > > >> > > > > >> > > Fixes: https://issues.apache.org/jira/browse/STREAMPIPES-323 > > >> > > > > >> > > > > >> > > > > >> > > -- > > >> > > This is an automated message from the Apache Git Service. > > >> > > To respond to the message, please log on to GitHub and use the > > >> > > URL above to go to the specific comment. > > >> > > > > >> > > For queries about this service, please contact Infrastructure at: > > >> > > [email protected] > > >> > > > > >> > > > > >> > > > > >> > > > >> > > >> > > > >
