Amar3tto commented on code in PR #22584:
URL: https://github.com/apache/beam/pull/22584#discussion_r938507548


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
    * validating connection to the CDAP sink/source and performing initial 
tuning.
    */
   public void prepareRun() {
-    PluginConfig pluginConfig = getPluginConfig();
-    checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
-    if (cdapPluginObj == null) {
-      try {
-        Constructor<?> constructor =
-            getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
-        constructor.setAccessible(true);
-        cdapPluginObj = (SubmitterLifecycle) 
constructor.newInstance(pluginConfig);
-      } catch (Exception e) {
-        LOG.error("Can not instantiate CDAP plugin class", e);
-        throw new IllegalStateException("Can not call prepareRun");
-      }
-    }
-    try {
-      cdapPluginObj.prepareRun(getContext());
-      if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
-        for (Map.Entry<String, String> entry :
-            
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) 
{
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+    if (!isUnbounded()) {
+      PluginConfig pluginConfig = getPluginConfig();
+      checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
+      if (cdapPluginObj == null) {
+        try {
+          Constructor<?> constructor =
+              getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
+          constructor.setAccessible(true);
+          cdapPluginObj = (SubmitterLifecycle) 
constructor.newInstance(pluginConfig);
+        } catch (Exception e) {
+          LOG.error("Can not instantiate CDAP plugin class", e);
+          throw new IllegalStateException("Can not call prepareRun");
         }
-      } else {
-        for (Map.Entry<String, String> entry :
-            
getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet())
 {
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+      }
+      try {
+        cdapPluginObj.prepareRun(getContext());

Review Comment:
   Fixed try-catch block.
   Don't think that we need a separate method for this, since our method 
already called `prepareRun()`.



##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
    * validating connection to the CDAP sink/source and performing initial 
tuning.
    */
   public void prepareRun() {
-    PluginConfig pluginConfig = getPluginConfig();
-    checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
-    if (cdapPluginObj == null) {
-      try {
-        Constructor<?> constructor =
-            getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
-        constructor.setAccessible(true);
-        cdapPluginObj = (SubmitterLifecycle) 
constructor.newInstance(pluginConfig);
-      } catch (Exception e) {
-        LOG.error("Can not instantiate CDAP plugin class", e);
-        throw new IllegalStateException("Can not call prepareRun");
-      }
-    }
-    try {
-      cdapPluginObj.prepareRun(getContext());
-      if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
-        for (Map.Entry<String, String> entry :
-            
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) 
{
-          getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+    if (!isUnbounded()) {
+      PluginConfig pluginConfig = getPluginConfig();
+      checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
+      if (cdapPluginObj == null) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to