[ 
https://issues.apache.org/jira/browse/FLINK-3782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenguang He updated FLINK-3782:
--------------------------------
    Description: 



@Test
        public void testSerializability() {
                try {
                        Collection<ElementType> inputCollection = new 
ArrayList<ElementType>();
                        ElementType element1 = new ElementType(1);
                        ElementType element2 = new ElementType(2);
                        ElementType element3 = new ElementType(3);
                        inputCollection.add(element1);
                        inputCollection.add(element2);
                        inputCollection.add(element3);
        
                        @SuppressWarnings("unchecked")
                        TypeInformation<ElementType> info = 
(TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
        
                        CollectionInputFormat<ElementType> inputFormat = new 
CollectionInputFormat<ElementType>(inputCollection,
                                        info.createSerializer(new 
ExecutionConfig()));

                        ByteArrayOutputStream buffer = new 
ByteArrayOutputStream();
                        ObjectOutputStream out = new ObjectOutputStream(buffer);

                        out.writeObject(inputFormat);

                        ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(buffer.toByteArray()));

                        Object serializationResult = in.readObject();

                        assertNotNull(serializationResult);
                        assertTrue(serializationResult instanceof 
CollectionInputFormat<?>);

                        @SuppressWarnings("unchecked")
                        CollectionInputFormat<ElementType> result = 
(CollectionInputFormat<ElementType>) serializationResult;

                        GenericInputSplit inputSplit = new GenericInputSplit(0, 
1);
                        inputFormat.open(inputSplit);
                        result.open(inputSplit);
                        while(!inputFormat.reachedEnd() && 
!result.reachedEnd()){
                                ElementType expectedElement = 
inputFormat.nextRecord(null);
                                ElementType actualElement = 
result.nextRecord(null);

                                assertEquals(expectedElement, actualElement);
                        }
                }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.toString());
                }
        }

  was:
@Test
        public void testSerializability() {
                try {
                        Collection<ElementType> inputCollection = new 
ArrayList<ElementType>();
                        ElementType element1 = new ElementType(1);
                        ElementType element2 = new ElementType(2);
                        ElementType element3 = new ElementType(3);
                        inputCollection.add(element1);
                        inputCollection.add(element2);
                        inputCollection.add(element3);
        
                        @SuppressWarnings("unchecked")
                        TypeInformation<ElementType> info = 
(TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
        
                        CollectionInputFormat<ElementType> inputFormat = new 
CollectionInputFormat<ElementType>(inputCollection,
                                        info.createSerializer(new 
ExecutionConfig()));

                        ByteArrayOutputStream buffer = new 
ByteArrayOutputStream();
                        ObjectOutputStream out = new ObjectOutputStream(buffer);

                        out.writeObject(inputFormat);

                        ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(buffer.toByteArray()));

                        Object serializationResult = in.readObject();

                        assertNotNull(serializationResult);
                        assertTrue(serializationResult instanceof 
CollectionInputFormat<?>);

                        @SuppressWarnings("unchecked")
                        CollectionInputFormat<ElementType> result = 
(CollectionInputFormat<ElementType>) serializationResult;

                        GenericInputSplit inputSplit = new GenericInputSplit(0, 
1);
                        inputFormat.open(inputSplit);
                        result.open(inputSplit);
                        while(!inputFormat.reachedEnd() && 
!result.reachedEnd()){
                                ElementType expectedElement = 
inputFormat.nextRecord(null);
                                ElementType actualElement = 
result.nextRecord(null);

                                assertEquals(expectedElement, actualElement);
                        }
                }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.toString());
                }
        }


> ByteArrayOutputStream and ObjectOutputStream should close
> ---------------------------------------------------------
>
>                 Key: FLINK-3782
>                 URL: https://issues.apache.org/jira/browse/FLINK-3782
>             Project: Flink
>          Issue Type: Test
>          Components: Java API
>    Affects Versions: 1.0.1
>            Reporter: Chenguang He
>            Priority: Minor
>              Labels: test
>
> @Test
>       public void testSerializability() {
>               try {
>                       Collection<ElementType> inputCollection = new 
> ArrayList<ElementType>();
>                       ElementType element1 = new ElementType(1);
>                       ElementType element2 = new ElementType(2);
>                       ElementType element3 = new ElementType(3);
>                       inputCollection.add(element1);
>                       inputCollection.add(element2);
>                       inputCollection.add(element3);
>       
>                       @SuppressWarnings("unchecked")
>                       TypeInformation<ElementType> info = 
> (TypeInformation<ElementType>) 
> TypeExtractor.createTypeInfo(ElementType.class);
>       
>                       CollectionInputFormat<ElementType> inputFormat = new 
> CollectionInputFormat<ElementType>(inputCollection,
>                                       info.createSerializer(new 
> ExecutionConfig()));
>                       ByteArrayOutputStream buffer = new 
> ByteArrayOutputStream();
>                       ObjectOutputStream out = new ObjectOutputStream(buffer);
>                       out.writeObject(inputFormat);
>                       ObjectInputStream in = new ObjectInputStream(new 
> ByteArrayInputStream(buffer.toByteArray()));
>                       Object serializationResult = in.readObject();
>                       assertNotNull(serializationResult);
>                       assertTrue(serializationResult instanceof 
> CollectionInputFormat<?>);
>                       @SuppressWarnings("unchecked")
>                       CollectionInputFormat<ElementType> result = 
> (CollectionInputFormat<ElementType>) serializationResult;
>                       GenericInputSplit inputSplit = new GenericInputSplit(0, 
> 1);
>                       inputFormat.open(inputSplit);
>                       result.open(inputSplit);
>                       while(!inputFormat.reachedEnd() && 
> !result.reachedEnd()){
>                               ElementType expectedElement = 
> inputFormat.nextRecord(null);
>                               ElementType actualElement = 
> result.nextRecord(null);
>                               assertEquals(expectedElement, actualElement);
>                       }
>               }
>               catch(Exception e) {
>                       e.printStackTrace();
>                       fail(e.toString());
>               }
>       }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to