This is an automated email from the ASF dual-hosted git repository.

kbeedkar pushed a commit to branch rel/0.7.1
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


The following commit(s) were added to refs/heads/rel/0.7.1 by this push:
     new 3675bdbd Modified the JavaTextFileSource so that a remot file can be 
loaded via HTTPS protocol (or HTTP)
     new 654848eb Merge pull request #348 from kamir/rel/0.7.1
3675bdbd is described below

commit 3675bdbd7034fc004b7c3253802fc3a77726f0c7
Author: Mirko Kämpf <[email protected]>
AuthorDate: Tue Oct 10 13:31:43 2023 +0200

    Modified the JavaTextFileSource so that a remot file can be loaded via 
HTTPS protocol (or HTTP)
---
 .../wayang/java/operators/JavaTextFileSource.java  |  61 ++++++--
 .../java/operators/JavaTextFileSourceTest.java     | 156 +++++++++++++++++++++
 .../src/test/resources/banking-tx-small.csv        |  63 +++++++++
 3 files changed, 271 insertions(+), 9 deletions(-)

diff --git 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
index 6d8577f6..fad925a1 100644
--- 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
+++ 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
@@ -35,11 +35,17 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.*;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.stream.Stream;
 
 /**
  * This is execution operator implements the {@link TextFileSource}.
@@ -65,20 +71,57 @@ public class JavaTextFileSource extends TextFileSource 
implements JavaExecutionO
             ChannelInstance[] outputs,
             JavaExecutor javaExecutor,
             OptimizationContext.OperatorContext operatorContext) {
+
         assert inputs.length == this.getNumInputs();
         assert outputs.length == this.getNumOutputs();
 
-        String url = this.getInputUrl().trim();
-        FileSystem fs = FileSystems.getFileSystem(url).orElseThrow(
-                () -> new WayangException(String.format("Cannot access file 
system of %s.", url))
-        );
+        String urlStr = this.getInputUrl().trim();
 
         try {
-            final InputStream inputStream = fs.open(url);
-            Stream<String> lines = new BufferedReader(new 
InputStreamReader(inputStream)).lines();
-            ((StreamChannel.Instance) outputs[0]).accept(lines);
-        } catch (IOException e) {
-            throw new WayangException(String.format("Reading %s failed.", 
url), e);
+
+            URL sourceUrl = new URL( urlStr );
+            String protocol = sourceUrl.getProtocol();
+
+            if ( protocol.startsWith("https") || protocol.startsWith("http")  
) {
+                try {
+                    HttpURLConnection connection = (HttpURLConnection) 
sourceUrl.openConnection();
+                    connection.setRequestMethod("GET");
+                    // Check if the response code indicates success (HTTP 
status code 200)
+                    if (connection.getResponseCode() == 
HttpURLConnection.HTTP_OK) {
+                        System.out.println(">>> Ready to stream the data from 
URL: " + urlStr);
+                        // Read the data line by line and process it in the 
StreamChannel
+                        Stream<String> lines2 = new BufferedReader(new 
InputStreamReader(connection.getInputStream())).lines();
+                        ((StreamChannel.Instance) outputs[0]).accept(lines2);
+                    }
+                }
+                catch (IOException ioException) {
+                    ioException.printStackTrace();
+                    throw new WayangException(String.format("Reading from URL: 
%s failed.", urlStr), ioException);
+                }
+            }
+            else if ( sourceUrl.getProtocol().startsWith("file") ) {
+
+                FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow(
+                        () -> new WayangException(String.format("Cannot access 
file system of %s. ", urlStr))
+                );
+
+                try {
+                    final InputStream inputStream = fs.open(urlStr);
+                    Stream<String> lines = new BufferedReader(new 
InputStreamReader(inputStream)).lines();
+                    ((StreamChannel.Instance) outputs[0]).accept(lines);
+                }
+                catch (IOException ioException) {
+                    ioException.printStackTrace();
+                    throw new WayangException(String.format("Reading from 
FILE: %s failed.", urlStr), ioException);
+                }
+            }
+            else {
+                throw new WayangException(String.format("PROTOCOL NOT 
SUPPORTED IN JavaTextFileSource. [%s] [%s] SUPPORTED ARE: (file|http|https)", 
urlStr, protocol));
+            }
+        }
+        catch (MalformedURLException e) {
+            e.printStackTrace();
+            throw new WayangException(String.format("Provided URL is not a 
valid URL: %s (MalformedURLException)", urlStr), e);
         }
 
         ExecutionLineageNode prepareLineageNode = new 
ExecutionLineageNode(operatorContext);
diff --git 
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java
 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java
new file mode 100644
index 00000000..304a209b
--- /dev/null
+++ 
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.Job;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.OutputSlot;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.fs.LocalFileSystem;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test suite for {@link JavaTextFileSource}.
+ */
+public class JavaTextFileSourceTest extends JavaExecutionOperatorTestBase {
+
+    private Locale defaultLocale;
+
+    /**
+     * In locales, where the decimal separator is not "." this rest would fail.
+     * Therefore we ensure it's run in a pre-defined locale and we make sure 
it's
+     * reset after the test.
+     */
+    @Before
+    public void setupTest() {
+        defaultLocale = Locale.getDefault();
+        Locale.setDefault(Locale.US);
+    }
+
+    @After
+    public void teardownTest() {
+        Locale.setDefault(defaultLocale);
+    }
+
+    @Test
+    public void testReadLocalFile() throws IOException, URISyntaxException {
+        final String testFileName = "/banking-tx-small.csv";
+
+        JavaExecutor javaExecutor = null;
+        try {
+            // Prepare the source.
+            final URL inputUrl = this.getClass().getResource(testFileName);
+            System.out.println( "*** " + inputUrl + "***");
+            JavaTextFileSource source = new JavaTextFileSource(
+                    inputUrl.toString() );
+
+            // Execute.
+            JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
+            JavaChannelInstance[] outputs = new 
JavaChannelInstance[]{createStreamChannelInstance()};
+            evaluate(source, inputs, outputs);
+
+            // Verify the outcome.
+            final List<String> result = 
outputs[0].<String>provideStream().collect(Collectors.toList());
+            Assert.assertEquals(63, result.size());
+        } finally {
+            if (javaExecutor != null) javaExecutor.dispose();
+        }
+    }
+
+    @Test
+    /**
+     * Requires a local HTTP Server running, in the project root ...
+     *
+     * With Python 3: python -m http.server
+     * With Python 2: python -m SimpleHTTPServer
+     */
+    public void testReadRemoteFileHTTP() throws IOException, 
URISyntaxException {
+        final String testFileURL = "http://localhost:8000/LICENSE";;
+
+        JavaExecutor javaExecutor = null;
+        try {
+            // Prepare the source.
+            final URL inputUrl = new URL(testFileURL);
+            System.out.println( "*** " + inputUrl + "***");
+            JavaTextFileSource source = new JavaTextFileSource(
+                    inputUrl.toString() );
+
+            // Execute.
+            JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
+            JavaChannelInstance[] outputs = new 
JavaChannelInstance[]{createStreamChannelInstance()};
+            evaluate(source, inputs, outputs);
+
+            // Verify the outcome.
+            final List<String> result = 
outputs[0].<String>provideStream().collect(Collectors.toList());
+            Assert.assertEquals(225, result.size());
+        } finally {
+            if (javaExecutor != null) javaExecutor.dispose();
+        }
+    }
+
+    @Test
+    public void testReadRemoteFileHTTPS() throws IOException, 
URISyntaxException {
+        final String testFileURL = 
"https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile/profile2.ttl";;
+
+        JavaExecutor javaExecutor = null;
+        try {
+            // Prepare the source.
+            final URL inputUrl = new URL(testFileURL);
+            System.out.println( "*** " + inputUrl + "***");
+            JavaTextFileSource source = new JavaTextFileSource(
+                    inputUrl.toString() );
+
+            // Execute.
+            JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
+            JavaChannelInstance[] outputs = new 
JavaChannelInstance[]{createStreamChannelInstance()};
+            evaluate(source, inputs, outputs);
+
+            // Verify the outcome.
+            final List<String> result = 
outputs[0].<String>provideStream().collect(Collectors.toList());
+            Assert.assertEquals(23, result.size());
+        } finally {
+            if (javaExecutor != null) javaExecutor.dispose();
+        }
+
+    }
+}
diff --git 
a/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv 
b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv
new file mode 100644
index 00000000..7b3757dd
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv
@@ -0,0 +1,63 @@
+Account ID,Date,Transaction Type,Description,Amount,Currency
+Account123,2023-08-01,Deposit,Salary Deposit,5000.00,USD
+Account123,2023-08-02,Withdrawal,ATM Withdrawal,100.00,USD
+Account123,2023-08-03,Transfer,Transfer to Savings,1000.00,USD
+Account123,2023-08-04,Payment,Electric Bill Payment,75.00,USD
+Account123,2023-08-05,Deposit,Dividend Payment,200.00,USD
+Account123,2023-08-06,Withdrawal,ATM Withdrawal,50.00,USD
+Account123,2023-08-07,Transfer,Transfer to Checking,300.00,USD
+Account123,2023-08-08,Payment,Internet Bill Payment,50.00,USD
+Account123,2023-08-09,Deposit,Bonus Deposit,1000.00,USD
+Account123,2023-08-10,Withdrawal,ATM Withdrawal,200.00,USD
+Account123,2023-08-11,Transfer,Transfer to Savings,500.00,USD
+Account123,2023-08-12,Payment,Gas Bill Payment,60.00,USD
+Account123,2023-08-13,Deposit,Interest Payment,150.00,USD
+Account123,2023-08-14,Withdrawal,ATM Withdrawal,75.00,USD
+Account123,2023-08-15,Transfer,Transfer to Checking,200.00,USD
+Account123,2023-08-16,Payment,Phone Bill Payment,40.00,USD
+Account123,2023-08-17,Deposit,Salary Deposit,5500.00,USD
+Account123,2023-08-18,Withdrawal,ATM Withdrawal,150.00,USD
+Account123,2023-08-19,Transfer,Transfer to Savings,1000.00,USD
+Account123,2023-08-20,Payment,Water Bill Payment,45.00,USD
+Account123,2023-08-21,Deposit,Dividend Payment,250.00,USD
+Account123,2023-08-22,Withdrawal,ATM Withdrawal,75.00,USD
+Account123,2023-08-23,Transfer,Transfer to Checking,400.00,USD
+Account123,2023-08-24,Payment,Electric Bill Payment,80.00,USD
+Account123,2023-08-25,Deposit,Bonus Deposit,1200.00,USD
+Account123,2023-08-26,Withdrawal,ATM Withdrawal,100.00,USD
+Account123,2023-08-27,Transfer,Transfer to Savings,700.00,USD
+Account123,2023-08-28,Payment,Internet Bill Payment,60.00,USD
+Account123,2023-08-29,Deposit,Interest Payment,180.00,USD
+Account123,2023-08-30,Withdrawal,ATM Withdrawal,50.00,USD
+Account123,2023-08-31,Transfer,Transfer to Checking,350.00,USD
+Account456,2023-08-01,Deposit,Salary Deposit,5500.00,USD
+Account456,2023-08-02,Withdrawal,ATM Withdrawal,200.00,USD
+Account456,2023-08-03,Transfer,Transfer to Savings,1500.00,USD
+Account456,2023-08-04,Payment,Electric Bill Payment,120.00,USD
+Account456,2023-08-05,Deposit,Dividend Payment,300.00,USD
+Account456,2023-08-06,Withdrawal,ATM Withdrawal,75.00,USD
+Account456,2023-08-07,Transfer,Transfer to Checking,400.00,USD
+Account456,2023-08-08,Payment,Internet Bill Payment,60.00,USD
+Account456,2023-08-09,Deposit,Bonus Deposit,1500.00,USD
+Account456,2023-08-10,Withdrawal,ATM Withdrawal,300.00,USD
+Account456,2023-08-11,Transfer,Transfer to Savings,1000.00,USD
+Account456,2023-08-12,Payment,Gas Bill Payment,80.00,USD
+Account456,2023-08-13,Deposit,Interest Payment,200.00,USD
+Account456,2023-08-14,Withdrawal,ATM Withdrawal,100.00,USD
+Account456,2023-08-15,Transfer,Transfer to Checking,500.00,USD
+Account456,2023-08-16,Payment,Phone Bill Payment,40.00,USD
+Account456,2023-08-17,Deposit,Salary Deposit,6000.00,USD
+Account456,2023-08-18,Withdrawal,ATM Withdrawal,250.00,USD
+Account456,2023-08-19,Transfer,Transfer to Savings,1200.00,USD
+Account456,2023-08-20,Payment,Water Bill Payment,60.00,USD
+Account456,2023-08-21,Deposit,Dividend Payment,350.00,USD
+Account456,2023-08-22,Withdrawal,ATM Withdrawal,100.00,USD
+Account456,2023-08-23,Transfer,Transfer to Checking,600.00,USD
+Account456,2023-08-24,Payment,Electric Bill Payment,100.00,USD
+Account456,2023-08-25,Deposit,Bonus Deposit,1800.00,USD
+Account456,2023-08-26,Withdrawal,ATM Withdrawal,150.00,USD
+Account456,2023-08-27,Transfer,Transfer to Savings,800.00,USD
+Account456,2023-08-28,Payment,Internet Bill Payment,80.00,USD
+Account456,2023-08-29,Deposit,Interest Payment,250.00,USD
+Account456,2023-08-30,Withdrawal,ATM Withdrawal,60.00,USD
+Account456,2023-08-31,Transfer,Transfer to Checking,700.00,USD
\ No newline at end of file

Reply via email to