Works for batch job, too. See enclosed. On 08/04/2015 01:34 PM, Matthias J. Sax wrote: > Yes, that is was the program does. However, streaming is not lazy so > deserialization should have happened. > > I will try a batch job, later today. > > On 08/04/2015 01:27 PM, Chesnay Schepler wrote: >> so I'm not to much into the streaming API, but as i see it this program >> creates an infinite number of tuples and then counts them, right? >> >> The problem with serialization as i understand it is that the receiver >> can't tell how many Tuple0 are sent, since you never actually read any >> data when deserializing a tuple. it's even more likely that it's not >> even attempted. >> >> As such, I'd be curious to see what happens when you create a batch job >> that with a limited number of starting tuples. >> >> On 04.08.2015 13:08, Matthias J. Sax wrote: >>> Hi, >>> >>> I just opened a PR for this. https://github.com/apache/flink/pull/983 >>> >>> However, I was not able to "reproduce" serialization issues... I tested >>> Tuple0 (see enclosed code) in a cluster, and the program worked. Do I >>> miss anything? >>> >>> -Matthias >>> >>> >>> >>> On 08/03/2015 01:01 AM, Matthias J. Sax wrote: >>>> Thanks for the advice about Tuple0. >>>> >>>> I personally don't see any advantage in having "flink-tuple" project. Do >>>> I miss anything about it? Furthermore, I am not sure if it is a good >>>> idea the have too many too small projects. >>>> >>>> >>>> On 08/03/2015 12:48 AM, Stephan Ewen wrote: >>>>> Tuple0 would need special serialization and comparator logic. If >>>>> that is >>>>> given, I see no reason not to support it. >>>>> >>>>> There is BTW, the request to create a dedicated "flink-tuple" >>>>> project, that >>>>> only contains the tuple classes. Any opinions on that? >>>>> >>>>> On Mon, Aug 3, 2015 at 12:45 AM, Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de> wrote: >>>>> >>>>>> Thanks for the explanation! >>>>>> >>>>>> As I mentioned before, Tuple0 might also be helpful for streaming. >>>>>> And I >>>>>> guess I will need it for Storm compatibility layer, too. (I need to >>>>>> double check, but Storm supports zero-attribute-tuples, too). >>>>>> >>>>>> With regard to the information I collected during the discussion, I >>>>>> vote >>>>>> for keeping Tuple0 in Flink core, and fix the serialization problem. >>>>>> Should we have another JIRA for this? Or should I extend the existing >>>>>> JIRA? (https://issues.apache.org/jira/browse/FLINK-2457) >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 08/03/2015 12:22 AM, Chesnay Schepler wrote: >>>>>>> First of all, it was a really good idea to start a discussion >>>>>>> about this. >>>>>>> >>>>>>> So the general idea behind Tuple0 was this: >>>>>>> >>>>>>> The Python API maps python tuples to flink tuples. Python can have >>>>>>> empty >>>>>>> tuples, so i thought "well duh, let's make a Tuple0 class!". What >>>>>>> i did >>>>>>> not wanna do is create some non-Tuple object to represent empty >>>>>>> tuples, >>>>>>> I'd rather have them treated the same, because it's less work and >>>>>>> creates simpler code. >>>>>>> >>>>>>> When transferring the plan to java, certain parameters for operations >>>>>>> are tuples, which can be empty aswell. >>>>>>> This is where the Tuple0 class is really useful, because these empty >>>>>>> tuples go through the same logic as other tuples. >>>>>>> This is also why i want to keep the class, at least in the python >>>>>>> project, for now. >>>>>>> >>>>>>> For the actual program execution, I need a new solution. Funny story, >>>>>>> while writing this reply i noticed that the Python API can't handle >>>>>>> Tuple0 at runtime aswell. ha...ha... -.- >>>>>>> >>>>>>> Guess I now know what I'm working on next. >>>>>>> >>>>>>> On 02.08.2015 21:24, Matthias J. Sax wrote: >>>>>>>> Can you elaborate how and why Python used Tuple0? If it cannot be >>>>>>>> serialized similar to regular Tuples, what is the usage in >>>>>>>> Python? Right >>>>>>>> now it seems, as there is no special serialization code for Tuple0. >>>>>>>> >>>>>>>> I just want to understand the topic in detail. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 08/01/2015 03:38 PM, Stephan Ewen wrote: >>>>>>>>> I think a Tuple0 cannot be implemented like the current tuples, at >>>>>> least >>>>>>>>> with respect to runtime serialization. >>>>>>>>> >>>>>>>>> The system makes the assumption that it makes progress in consuming >>>>>>>>> bytes >>>>>>>>> when deserializing values. If a Tuple= never consumes data from the >>>>>> byte >>>>>>>>> stream, this assumption is broken. It would need at least one >>>>>>>>> marker >>>>>>>>> byte. >>>>>>>>> Then it effectively is a Tuple1<Byte> disgusing itself as a tuple0. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Aug 1, 2015 at 1:38 PM, Matthias J. Sax < >>>>>>>>> mj...@informatik.hu-berlin.de> wrote: >>>>>>>>> >>>>>>>>>> I just double checked. Scala does not have type Tuple0. IMHO, >>>>>>>>>> it would >>>>>>>>>> be best to remove Tuple0 for consistency. Having Tuple types is >>>>>>>>>> for >>>>>>>>>> consistency reason with Scala in the first place, right? Please >>>>>>>>>> give >>>>>>>>>> feedback. >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 08/01/2015 01:04 PM, Matthias J. Sax wrote: >>>>>>>>>>> I see. >>>>>>>>>>> >>>>>>>>>>> I think that it might be useful to have Tuple0, because in rare >>>>>> cases, >>>>>>>>>>> you only want to "notify" a downstream operators (taking about >>>>>>>>>>> streaming) that something happened but there is no actual data >>>>>>>>>>> to be >>>>>>>>>>> processed. Furthermore, if Flink cannot deal with Tuple0 it >>>>>>>>>>> should be >>>>>>>>>>> removed completely for consistency IMHO. >>>>>>>>>>> >>>>>>>>>>> I will open a JIRA for it. >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 07/31/2015 10:44 PM, Chesnay Schepler wrote: >>>>>>>>>>>> also, I'm not sure if I ever sent a Tuple0 through a program, it >>>>>>>>>>>> could >>>>>>>>>>>> be that the system freaks out. >>>>>>>>>>>> >>>>>>>>>>>> On 31.07.2015 22:40, Chesnay Schepler wrote: >>>>>>>>>>>>> there's no specific reason. it was added fairly recently by me >>>>>>>>>>>>> (mid of >>>>>>>>>>>>> april), and you're most likely the second person to use it. >>>>>>>>>>>>> >>>>>>>>>>>>> i didn't integrate into all our tuple related stuff because, >>>>>>>>>>>>> well, >>>>>> i >>>>>>>>>>>>> never thought anyone would actually need it, so i saved >>>>>>>>>>>>> myself the >>>>>>>>>>>>> trouble. >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> is there any specific reason, why Tuple.getTupleClass(int >>>>>>>>>>>>>> arity) >>>>>>>>>>>>>> does >>>>>>>>>>>>>> not support arity zero? There is a class Tuple0, but it >>>>>>>>>>>>>> cannot be >>>>>>>>>>>>>> generator by Tuple.getTupleClass(...). Is it a missing >>>>>>>>>>>>>> feature (I >>>>>>>>>> would >>>>>>>>>>>>>> like to have it). >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>> >> >
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.flink.stormcompatibility.wordcount; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple0; public class TestEmptyTupleBatch { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple0> input = env.fromElements(new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0(), new Tuple0()); input.map(new Counter()).setParallelism(4).print(); } public static class Counter implements MapFunction<Tuple0, Integer> { private static final long serialVersionUID = 5518823010274195186L; int counter = 0; @Override public Integer map(Tuple0 value) throws Exception { System.out.println("mjsax: " + (++counter) + " " + value.getField(0)); return new Integer(value.getArity()); } } }
signature.asc
Description: OpenPGP digital signature